http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 7f24cd3..aa0f08d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -45,6 +45,7 @@ import org.apache.flink.runtime.jobgraph.JobEdge; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; @@ -140,7 +141,9 @@ public class StreamingJobGraphGenerator { legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph)); } - setChaining(hashes, legacyHashes); + Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>(); + + setChaining(hashes, legacyHashes, chainedOperatorHashes); setPhysicalEdges(); @@ -190,9 +193,9 @@ public class StreamingJobGraphGenerator { * * <p>This will recursively create all {@link JobVertex} instances. */ - private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) { + private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) { for (Integer sourceNodeId : streamGraph.getSourceIDs()) { - createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0); + createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes); } } @@ -201,7 +204,8 @@ public class StreamingJobGraphGenerator { Integer currentNodeId, Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, - int chainIndex) { + int chainIndex, + Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) { if (!builtVertices.contains(startNodeId)) { @@ -220,20 +224,27 @@ public class StreamingJobGraphGenerator { for (StreamEdge chainable : chainableOutputs) { transitiveOutEdges.addAll( - createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1)); + createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes)); } for (StreamEdge nonChainable : nonChainableOutputs) { transitiveOutEdges.add(nonChainable); - createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0); + createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes); + } + + List<Tuple2<byte[], byte[]>> operatorHashes = chainedOperatorHashes.get(startNodeId); + if (operatorHashes == null) { + operatorHashes = new ArrayList<>(); + chainedOperatorHashes.put(startNodeId, operatorHashes); } + operatorHashes.add(new Tuple2<>(hashes.get(currentNodeId), legacyHashes.get(1).get(currentNodeId))); chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs)); chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs)); chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs)); StreamConfig config = currentNodeId.equals(startNodeId) - ? createJobVertex(startNodeId, hashes, legacyHashes) + ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes) : new StreamConfig(new Configuration()); setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs); @@ -308,7 +319,8 @@ public class StreamingJobGraphGenerator { private StreamConfig createJobVertex( Integer streamNodeId, Map<Integer, byte[]> hashes, - List<Map<Integer, byte[]>> legacyHashes) { + List<Map<Integer, byte[]>> legacyHashes, + Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) { JobVertex jobVertex; StreamNode streamNode = streamGraph.getStreamNode(streamNodeId); @@ -330,18 +342,32 @@ public class StreamingJobGraphGenerator { } } + List<Tuple2<byte[], byte[]>> chainedOperators = chainedOperatorHashes.get(streamNodeId); + List<OperatorID> chainedOperatorVertexIds = new ArrayList<>(); + List<OperatorID> userDefinedChainedOperatorVertexIds = new ArrayList<>(); + if (chainedOperators != null) { + for (Tuple2<byte[], byte[]> chainedOperator : chainedOperators) { + chainedOperatorVertexIds.add(new OperatorID(chainedOperator.f0)); + userDefinedChainedOperatorVertexIds.add(chainedOperator.f1 != null ? new OperatorID(chainedOperator.f1) : null); + } + } + if (streamNode.getInputFormat() != null) { jobVertex = new InputFormatVertex( chainedNames.get(streamNodeId), jobVertexId, - legacyJobVertexIds); + legacyJobVertexIds, + chainedOperatorVertexIds, + userDefinedChainedOperatorVertexIds); TaskConfig taskConfig = new TaskConfig(jobVertex.getConfiguration()); taskConfig.setStubWrapper(new UserCodeObjectWrapper<Object>(streamNode.getInputFormat())); } else { jobVertex = new JobVertex( chainedNames.get(streamNodeId), jobVertexId, - legacyJobVertexIds); + legacyJobVertexIds, + chainedOperatorVertexIds, + userDefinedChainedOperatorVertexIds); } jobVertex.setResources(chainedMinResources.get(streamNodeId), chainedPreferredResources.get(streamNodeId));
http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java index 05aa694..a897674 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java @@ -47,7 +47,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** * Tests the {@link StreamNode} hash assignment during translation from {@link StreamGraph} to @@ -392,10 +391,10 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger { } /** - * Tests that a manual hash for an intermediate chain node throws an Exception. + * Tests that a manual hash for an intermediate chain node is accepted. */ - @Test(expected = UnsupportedOperationException.class) - public void testManualHashAssignmentForIntermediateNodeInChainThrowsException() throws Exception { + @Test + public void testManualHashAssignmentForIntermediateNodeInChain() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); env.setParallelism(4); @@ -409,9 +408,6 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger { /** * Tests that a manual hash at the beginning of a chain is accepted. - * - * <p>This should work, because the ID is used at the beginning of a chain. This is currently - * not allowed for intermediate nodes (see {@link #testManualHashAssignmentForIntermediateNodeInChainThrowsException()}). */ @Test public void testManualHashAssignmentForStartNodeInInChain() throws Exception { @@ -446,7 +442,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger { } @Test - public void testUserProvidedHashingOnChainNotSupported() { + public void testUserProvidedHashingOnChainSupported() { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); env.addSource(new NoOpSourceFunction(), "src").setUidHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") @@ -455,11 +451,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger { .keyBy(new NoOpKeySelector()) .reduce(new NoOpReduceFunction()).name("reduce").setUidHash("dddddddddddddddddddddddddddddddd"); - try { - env.getStreamGraph().getJobGraph(); - fail(); - } catch (UnsupportedOperationException ignored) { - } + env.getStreamGraph().getJobGraph(); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index e0de7d2..72a1b63 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -32,28 +32,32 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.checkpoint.SubtaskState; -import org.apache.flink.runtime.checkpoint.TaskState; +import org.apache.flink.runtime.checkpoint.OperatorState; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.TaskInformation; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob; import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint; import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint; import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess; -import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.runtime.state.filesystem.FsStateBackend; @@ -93,7 +97,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -102,7 +108,6 @@ import static org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSav import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -310,30 +315,43 @@ public class SavepointITCase extends TestLogger { }; }}; + ExecutionGraph graph = (ExecutionGraph) ((JobManagerMessages.JobFound) Await.result(jobManager.ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft()), deadline.timeLeft())).executionGraph(); + // - Verification START ------------------------------------------- String errMsg = "Error during gathering of TaskDeploymentDescriptors"; - assertNull(errMsg, error[0]); + if (error[0] != null) { + throw new RuntimeException(error[0]); + } + + Map<OperatorID, Tuple2<Integer, ExecutionJobVertex>> operatorToJobVertexMapping = new HashMap<>(); + for (ExecutionJobVertex task : graph.getVerticesTopologically()) { + List<OperatorID> operatorIDs = task.getOperatorIDs(); + for (int x = 0; x < operatorIDs.size(); x++) { + operatorToJobVertexMapping.put(operatorIDs.get(x), new Tuple2<>(x, task)); + } + } // Verify that all tasks, which are part of the savepoint // have a matching task deployment descriptor. - for (TaskState taskState : savepoint.getTaskStates()) { - Collection<TaskDeploymentDescriptor> taskTdds = tdds.get(taskState.getJobVertexID()); + for (OperatorState operatorState : savepoint.getOperatorStates()) { + Tuple2<Integer, ExecutionJobVertex> chainIndexAndJobVertex = operatorToJobVertexMapping.get(operatorState.getOperatorID()); + Collection<TaskDeploymentDescriptor> taskTdds = tdds.get(chainIndexAndJobVertex.f1.getJobVertexId()); errMsg = "Missing task for savepoint state for operator " - + taskState.getJobVertexID() + "."; + + operatorState.getOperatorID() + "."; assertTrue(errMsg, taskTdds.size() > 0); - assertEquals(taskState.getNumberCollectedStates(), taskTdds.size()); + assertEquals(operatorState.getNumberCollectedStates(), taskTdds.size()); for (TaskDeploymentDescriptor tdd : taskTdds) { - SubtaskState subtaskState = taskState.getState(tdd.getSubtaskIndex()); + OperatorSubtaskState subtaskState = operatorState.getState(tdd.getSubtaskIndex()); assertNotNull(subtaskState); errMsg = "Initial operator state mismatch."; assertEquals(errMsg, subtaskState.getLegacyOperatorState(), - tdd.getTaskStateHandles().getLegacyOperatorState()); + tdd.getTaskStateHandles().getLegacyOperatorState().get(chainIndexAndJobVertex.f0)); } } @@ -360,15 +378,13 @@ public class SavepointITCase extends TestLogger { // The checkpoint files List<File> checkpointFiles = new ArrayList<>(); - for (TaskState stateForTaskGroup : savepoint.getTaskStates()) { - for (SubtaskState subtaskState : stateForTaskGroup.getStates()) { - ChainedStateHandle<StreamStateHandle> streamTaskState = subtaskState.getLegacyOperatorState(); + for (OperatorState stateForTaskGroup : savepoint.getOperatorStates()) { + for (OperatorSubtaskState subtaskState : stateForTaskGroup.getStates()) { + StreamStateHandle streamTaskState = subtaskState.getLegacyOperatorState(); - for (int i = 0; i < streamTaskState.getLength(); i++) { - if (streamTaskState.get(i) != null) { - FileStateHandle fileStateHandle = (FileStateHandle) streamTaskState.get(i); - checkpointFiles.add(new File(fileStateHandle.getFilePath().toUri())); - } + if (streamTaskState != null) { + FileStateHandle fileStateHandle = (FileStateHandle) streamTaskState; + checkpointFiles.add(new File(fileStateHandle.getFilePath().toUri())); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java new file mode 100644 index 0000000..2eecf49 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.test.state.operator.restore; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.testingUtils.TestingJobManager; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist; +import org.apache.flink.runtime.testingUtils.TestingTaskManager; +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import scala.Option; +import scala.Tuple2; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.net.URL; +import java.util.concurrent.TimeUnit; + +/** + * Abstract class to verify that it is possible to migrate a 1.2 savepoint to 1.3 and that the topology can be modified + * from that point on. + * + * The verification is done in 2 Steps: + * Step 1: Migrate the job to 1.3 by submitting the same job used for the 1.2 savepoint, and create a new savepoint. + * Step 2: Modify the job topology, and restore from the savepoint created in step 1. + */ +public abstract class AbstractOperatorRestoreTestBase { + + @Rule + public final TemporaryFolder tmpFolder = new TemporaryFolder(); + + private static ActorSystem actorSystem = null; + private static ActorGateway jobManager = null; + private static ActorGateway archiver = null; + private static ActorGateway taskManager = null; + + private static final FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS); + + @BeforeClass + public static void setupCluster() throws Exception { + FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS); + + actorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); + + Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors( + new Configuration(), + actorSystem, + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + Option.apply("jm"), + Option.apply("arch"), + TestingJobManager.class, + TestingMemoryArchivist.class); + + jobManager = new AkkaActorGateway(master._1(), HighAvailabilityServices.DEFAULT_LEADER_ID); + archiver = new AkkaActorGateway(master._2(), HighAvailabilityServices.DEFAULT_LEADER_ID); + + Configuration tmConfig = new Configuration(); + tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); + + ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor( + tmConfig, + ResourceID.generate(), + actorSystem, + "localhost", + Option.apply("tm"), + Option.<LeaderRetrievalService>apply(new StandaloneLeaderRetrievalService(jobManager.path(), HighAvailabilityServices.DEFAULT_LEADER_ID)), + true, + TestingTaskManager.class); + + taskManager = new AkkaActorGateway(taskManagerRef, HighAvailabilityServices.DEFAULT_LEADER_ID); + + // Wait until connected + Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor()); + Await.ready(taskManager.ask(msg, timeout), timeout); + } + + @AfterClass + public static void tearDownCluster() { + if (actorSystem != null) { + actorSystem.shutdown(); + } + + if (archiver != null) { + archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + + if (jobManager != null) { + jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + + if (taskManager != null) { + taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + } + + @Test + public void testMigrationAndRestore() throws Throwable { + // submit 1.2 job and create a migrated 1.3 savepoint + String savepointPath = migrateJob(); + // restore from migrated 1.3 savepoint + restoreJob(savepointPath); + } + + private String migrateJob() throws Throwable { + URL savepointResource = AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/" + getMigrationSavepointName()); + if (savepointResource == null) { + throw new IllegalArgumentException("Savepoint file does not exist."); + } + JobGraph jobToMigrate = createJobGraph(ExecutionMode.MIGRATE); + jobToMigrate.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointResource.getFile())); + + Object msg; + Object result; + + // Submit job graph + msg = new JobManagerMessages.SubmitJob(jobToMigrate, ListeningBehaviour.DETACHED); + result = Await.result(jobManager.ask(msg, timeout), timeout); + + if (result instanceof JobManagerMessages.JobResultFailure) { + JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result; + throw new Exception(failure.cause()); + } + Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass()); + + // Wait for all tasks to be running + msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobToMigrate.getJobID()); + Await.result(jobManager.ask(msg, timeout), timeout); + + // Trigger savepoint + File targetDirectory = tmpFolder.newFolder(); + msg = new JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(), targetDirectory.getAbsolutePath()); + Future<Object> future = jobManager.ask(msg, timeout); + result = Await.result(future, timeout); + + if (result instanceof JobManagerMessages.CancellationFailure) { + JobManagerMessages.CancellationFailure failure = (JobManagerMessages.CancellationFailure) result; + throw new Exception(failure.cause()); + } + + String savepointPath = ((JobManagerMessages.CancellationSuccess) result).savepointPath(); + + // Wait until canceled + msg = new TestingJobManagerMessages.NotifyWhenJobStatus(jobToMigrate.getJobID(), JobStatus.CANCELED); + Await.ready(jobManager.ask(msg, timeout), timeout); + + return savepointPath; + } + + private void restoreJob(String savepointPath) throws Exception { + JobGraph jobToRestore = createJobGraph(ExecutionMode.RESTORE); + jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, true)); + + Object msg; + Object result; + + // Submit job graph + msg = new JobManagerMessages.SubmitJob(jobToRestore, ListeningBehaviour.DETACHED); + result = Await.result(jobManager.ask(msg, timeout), timeout); + + if (result instanceof JobManagerMessages.JobResultFailure) { + JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result; + throw new Exception(failure.cause()); + } + Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass()); + + msg = new JobManagerMessages.RequestJobStatus(jobToRestore.getJobID()); + JobStatus status = ((JobManagerMessages.CurrentJobStatus) Await.result(jobManager.ask(msg, timeout), timeout)).status(); + while (!status.isTerminalState()) { + status = ((JobManagerMessages.CurrentJobStatus) Await.result(jobManager.ask(msg, timeout), timeout)).status(); + } + + Assert.assertEquals(JobStatus.FINISHED, status); + } + + private JobGraph createJobGraph(ExecutionMode mode) { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setStateBackend(new MemoryStateBackend()); + + switch (mode) { + case MIGRATE: + createMigrationJob(env); + break; + case RESTORE: + createRestoredJob(env); + break; + } + + return StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()); + } + + /** + * Recreates the job used to create the 1.2 savepoint. + * + * @param env StreamExecutionEnvironment to use + */ + protected abstract void createMigrationJob(StreamExecutionEnvironment env); + + /** + * Creates a modified version of the job used to create the 1.2 savepoint. + * + * @param env StreamExecutionEnvironment to use + */ + protected abstract void createRestoredJob(StreamExecutionEnvironment env); + + /** + * Returns the name of the savepoint directory to use, relative to "resources/operatorstate". + * + * @return savepoint directory to use + */ + protected abstract String getMigrationSavepointName(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java new file mode 100644 index 0000000..f333aca --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.test.state.operator.restore; + +/** + * Enum to control function behavior for the different test stages. + * + * {@link ExecutionMode#GENERATE} should be used when creating the 1.2 savepoint. + * {@link ExecutionMode#MIGRATE} should be used when migrating the 1.2 savepoint to 1.3. + * {@link ExecutionMode#RESTORE} should be used when restoring from the migrated 1.3 savepoint. + */ +public enum ExecutionMode { + GENERATE, + MIGRATE, + RESTORE +} http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java new file mode 100644 index 0000000..28cd15a --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.test.state.operator.restore.keyed; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase; +import org.apache.flink.test.state.operator.restore.ExecutionMode; + +public class KeyedComplexChainTest extends AbstractOperatorRestoreTestBase { + + @Override + public void createMigrationJob(StreamExecutionEnvironment env) { + /** + * Source -> keyBy -> C(Window -> StatefulMap1 -> StatefulMap2) + */ + SingleOutputStreamOperator<Tuple2<Integer, Integer>> source = KeyedJob.createIntegerTupleSource(env, ExecutionMode.MIGRATE); + + SingleOutputStreamOperator<Integer> window = KeyedJob.createWindowFunction(ExecutionMode.MIGRATE, source); + + SingleOutputStreamOperator<Integer> first = KeyedJob.createFirstStatefulMap(ExecutionMode.MIGRATE, window); + + SingleOutputStreamOperator<Integer> second = KeyedJob.createSecondStatefulMap(ExecutionMode.MIGRATE, first); + } + + @Override + protected void createRestoredJob(StreamExecutionEnvironment env) { + /** + * Source -> keyBy -> C(Window -> StatefulMap2) -> StatefulMap1 + */ + SingleOutputStreamOperator<Tuple2<Integer, Integer>> source = KeyedJob.createIntegerTupleSource(env, ExecutionMode.RESTORE); + + SingleOutputStreamOperator<Integer> window = KeyedJob.createWindowFunction(ExecutionMode.RESTORE, source); + + SingleOutputStreamOperator<Integer> second = KeyedJob.createSecondStatefulMap(ExecutionMode.RESTORE, window); + + SingleOutputStreamOperator<Integer> first = KeyedJob.createFirstStatefulMap(ExecutionMode.RESTORE, second); + first.startNewChain(); + } + + @Override + protected final String getMigrationSavepointName() { + return "complexKeyed"; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java new file mode 100644 index 0000000..6add7b2 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.test.state.operator.restore.keyed; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; +import org.apache.flink.test.state.operator.restore.ExecutionMode; +import org.apache.flink.util.Collector; +import org.junit.Assert; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +/** + * Savepoint generator to create the job used by the {@link KeyedComplexChainTest}. + * + * The job should be cancelled manually through the REST API using the cancel-with-savepoint operation. + */ +public class KeyedJob { + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + + String savepointsPath = pt.getRequired("savepoint-path"); + + Configuration config = new Configuration(); + config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, savepointsPath); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config); + env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE); + env.setRestartStrategy(RestartStrategies.noRestart()); + + env.setStateBackend(new MemoryStateBackend()); + + /** + * Source -> keyBy -> C(Window -> StatefulMap1 -> StatefulMap2) + */ + + SingleOutputStreamOperator<Tuple2<Integer, Integer>> source = createIntegerTupleSource(env, ExecutionMode.GENERATE); + + SingleOutputStreamOperator<Integer> window = createWindowFunction(ExecutionMode.GENERATE, source); + + SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.GENERATE, window); + + SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.GENERATE, first); + + env.execute("job"); + } + + public static SingleOutputStreamOperator<Tuple2<Integer, Integer>> createIntegerTupleSource(StreamExecutionEnvironment env, ExecutionMode mode) { + return env.addSource(new IntegerTupleSource(mode)); + } + + public static SingleOutputStreamOperator<Integer> createWindowFunction(ExecutionMode mode, DataStream<Tuple2<Integer, Integer>> input) { + return input + .keyBy(0) + .countWindow(1) + .apply(new StatefulWindowFunction(mode)) + .setParallelism(4) + .uid("window"); + } + + public static SingleOutputStreamOperator<Integer> createFirstStatefulMap(ExecutionMode mode, DataStream<Integer> input) { + SingleOutputStreamOperator<Integer> map = input + .map(new StatefulStringStoringMap(mode, "first")) + .setParallelism(4); + + // TODO: re-enable this when generating the actual 1.2 savepoint + //if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) { + map.uid("first"); + //} + + return map; + } + + public static SingleOutputStreamOperator<Integer> createSecondStatefulMap(ExecutionMode mode, DataStream<Integer> input) { + SingleOutputStreamOperator<Integer> map = input + .map(new StatefulStringStoringMap(mode, "second")) + .setParallelism(4); + + // TODO: re-enable this when generating the actual 1.2 savepoint + //if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) { + map.uid("second"); + //} + + return map; + } + + private static final class IntegerTupleSource extends RichSourceFunction<Tuple2<Integer, Integer>> { + + private static final long serialVersionUID = 1912878510707871659L; + private final ExecutionMode mode; + + private boolean running = true; + + private IntegerTupleSource(ExecutionMode mode) { + this.mode = mode; + } + + @Override + public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { + for (int x = 0; x < 10; x++) { + ctx.collect(new Tuple2<>(x, x)); + } + + switch (mode) { + case GENERATE: + case MIGRATE: + synchronized (this) { + while (running) { + this.wait(); + } + } + } + } + + @Override + public void cancel() { + synchronized (this) { + running = false; + this.notifyAll(); + } + } + } + + private static final class StatefulWindowFunction extends RichWindowFunction<Tuple2<Integer, Integer>, Integer, Tuple, GlobalWindow> { + + private static final long serialVersionUID = -7236313076792964055L; + + private final ExecutionMode mode; + private transient ListState<Integer> state; + + private boolean applyCalled = false; + + private StatefulWindowFunction(ExecutionMode mode) { + this.mode = mode; + } + + @Override + public void open(Configuration config) { + this.state = getRuntimeContext().getListState(new ListStateDescriptor<>("values", Integer.class)); + } + + @Override + public void apply(Tuple key, GlobalWindow window, Iterable<Tuple2<Integer, Integer>> values, Collector<Integer> out) throws Exception { + // fail-safe to make sure apply is actually called + applyCalled = true; + switch (mode) { + case GENERATE: + for (Tuple2<Integer, Integer> value : values) { + state.add(value.f1); + } + break; + case MIGRATE: + case RESTORE: + Iterator<Tuple2<Integer, Integer>> input = values.iterator(); + Iterator<Integer> restored = state.get().iterator(); + while (input.hasNext() && restored.hasNext()) { + Tuple2<Integer, Integer> value = input.next(); + Integer rValue = restored.next(); + Assert.assertEquals(rValue, value.f1); + } + Assert.assertEquals(restored.hasNext(), input.hasNext()); + } + } + + @Override + public void close() { + Assert.assertTrue("Apply was never called.", applyCalled); + } + } + + private static class StatefulStringStoringMap extends RichMapFunction<Integer, Integer> implements ListCheckpointed<String> { + + private static final long serialVersionUID = 6092985758425330235L; + private final ExecutionMode mode; + private final String valueToStore; + + private StatefulStringStoringMap(ExecutionMode mode, String valueToStore) { + this.mode = mode; + this.valueToStore = valueToStore; + } + + @Override + public Integer map(Integer value) throws Exception { + return value; + } + + @Override + public List<String> snapshotState(long checkpointId, long timestamp) throws Exception { + return Arrays.asList(valueToStore + getRuntimeContext().getIndexOfThisSubtask()); + } + + @Override + public void restoreState(List<String> state) throws Exception { + switch (mode) { + case GENERATE: + break; + case MIGRATE: + case RESTORE: + Assert.assertEquals("Failed for " + valueToStore + getRuntimeContext().getIndexOfThisSubtask(), 1, state.size()); + String value = state.get(0); + Assert.assertEquals(valueToStore + getRuntimeContext().getIndexOfThisSubtask(), value); + } + } + } + + + private KeyedJob() { + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java new file mode 100644 index 0000000..5b51765 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.test.state.operator.restore.unkeyed; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase; +import org.apache.flink.test.state.operator.restore.ExecutionMode; + +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap; +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap; +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource; +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createStatelessMap; +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap; + +/** + * All classes extending this class will use the same savepoint and migration job. + */ +public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase { + + @Override + public void createMigrationJob(StreamExecutionEnvironment env) { + /** + * Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3) + */ + DataStream<Integer> source = createSource(env, ExecutionMode.MIGRATE); + + SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.MIGRATE, source); + first.startNewChain(); + + SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.MIGRATE, first); + second.startNewChain(); + + SingleOutputStreamOperator<Integer> stateless = createStatelessMap(second); + + SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.MIGRATE, stateless); + } + + @Override + protected final String getMigrationSavepointName() { + return "nonKeyed"; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java new file mode 100644 index 0000000..6838070 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.test.state.operator.restore.unkeyed; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.state.operator.restore.ExecutionMode; + +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap; +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap; +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource; +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createStatelessMap; +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap; + +/** + * Verifies that the state of all operators is restored if a topology change breaks up a chain. + */ +public class ChainBreakTest extends AbstractNonKeyedOperatorRestoreTestBase { + + @Override + public void createRestoredJob(StreamExecutionEnvironment env) { + /** + * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3) + * Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map) -> StatefulMap3 + */ + DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE); + + SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source); + first.startNewChain(); + + SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.RESTORE, first); + second.startNewChain(); + + SingleOutputStreamOperator<Integer> stateless = createStatelessMap(second); + + SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, stateless); + third.startNewChain(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java new file mode 100644 index 0000000..e405e76 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.test.state.operator.restore.unkeyed; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.state.operator.restore.ExecutionMode; + +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap; +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource; +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createStatelessMap; +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap; + +/** + * Verifies that the state of all operators is restored if a topology change removes an operator from a chain. + */ +public class ChainLengthDecreaseTest extends AbstractNonKeyedOperatorRestoreTestBase { + + @Override + public void createRestoredJob(StreamExecutionEnvironment env) { + /** + * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3) + * Modified job: Source -> StatefulMap1 -> CHAIN(Map -> StatefulMap3) + */ + DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE); + + SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source); + first.startNewChain(); + + SingleOutputStreamOperator<Integer> stateless = createStatelessMap(first); + stateless.startNewChain(); + + SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, stateless); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java new file mode 100644 index 0000000..b78aa10 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.test.state.operator.restore.unkeyed; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.state.operator.restore.ExecutionMode; + +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap; +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap; +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource; +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createStatelessMap; +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap; + +/** + * Verifies that the state of all operator is restored if a topology change adds an operator to a chain. + */ +public class ChainLengthIncreaseTest extends AbstractNonKeyedOperatorRestoreTestBase { + + @Override + public void createRestoredJob(StreamExecutionEnvironment env) { + /** + * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3) + * Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3 -> StatefulMap4) + */ + DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE); + + SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source); + first.startNewChain(); + + SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.RESTORE, first); + second.startNewChain(); + + SingleOutputStreamOperator<Integer> stateless = createStatelessMap(second); + + SingleOutputStreamOperator<Integer> stateless2 = createStatelessMap(stateless); + + SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, stateless2); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java new file mode 100644 index 0000000..7c68b4e --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.test.state.operator.restore.unkeyed; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.state.operator.restore.ExecutionMode; + +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap; +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap; +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource; +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createStatelessMap; +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap; + +/** + * Verifies that the state of all operators is restored if a topology change causes the ordering of a chain to change. + */ +public class ChainOrderTest extends AbstractNonKeyedOperatorRestoreTestBase { + + @Override + public void createRestoredJob(StreamExecutionEnvironment env) { + /** + * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3) + * Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap3 -> Map -> StatefulMap2) + */ + DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE); + + SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source); + first.startNewChain(); + + SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, first); + third.startNewChain(); + + SingleOutputStreamOperator<Integer> stateless = createStatelessMap(third); + + SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.RESTORE, stateless); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java new file mode 100644 index 0000000..3f2fba4 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.test.state.operator.restore.unkeyed; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.state.operator.restore.ExecutionMode; + +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap; +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap; +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource; +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createStatelessMap; +import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap; + +/** + * Verifies that the state of all operators is restored if a topology change creates a new chain. + */ +public class ChainUnionTest extends AbstractNonKeyedOperatorRestoreTestBase { + + @Override + public void createRestoredJob(StreamExecutionEnvironment env) { + /** + * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3) + * Modified job: Source -> CHAIN(StatefulMap1 -> StatefulMap2 -> Map -> StatefulMap3) + */ + DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE); + + SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source); + first.startNewChain(); + + SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.RESTORE, first); + + SingleOutputStreamOperator<Integer> stateless = createStatelessMap(second); + + SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, stateless); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java new file mode 100644 index 0000000..32067b3 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.test.state.operator.restore.unkeyed; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.test.state.operator.restore.ExecutionMode; +import org.junit.Assert; + +import java.util.Arrays; +import java.util.List; + +/** + * Savepoint generator to create the job used by the {@link AbstractNonKeyedOperatorRestoreTestBase}. + * + * The job should be cancelled manually through the REST API using the cancel-with-savepoint operation. + */ +public class NonKeyedJob { + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + + String savepointsPath = pt.getRequired("savepoint-path"); + + Configuration config = new Configuration(); + config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, savepointsPath); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config); + env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE); + env.setRestartStrategy(RestartStrategies.noRestart()); + + env.setStateBackend(new MemoryStateBackend()); + + /** + * Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3) + */ + DataStream<Integer> source = createSource(env, ExecutionMode.GENERATE); + + SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.GENERATE, source); + first.startNewChain(); + + SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.GENERATE, first); + second.startNewChain(); + + SingleOutputStreamOperator<Integer> stateless = createStatelessMap(second); + + SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.GENERATE, stateless); + + env.execute("job"); + } + + public static SingleOutputStreamOperator<Integer> createSource(StreamExecutionEnvironment env, ExecutionMode mode) { + return env.addSource(new IntegerSource(mode)) + .setParallelism(4); + } + + public static SingleOutputStreamOperator<Integer> createFirstStatefulMap(ExecutionMode mode, DataStream<Integer> input) { + return input + .map(new StatefulStringStoringMap(mode, "first")) + .setParallelism(4) + .uid("first"); + } + + public static SingleOutputStreamOperator<Integer> createSecondStatefulMap(ExecutionMode mode, DataStream<Integer> input) { + return input + .map(new StatefulStringStoringMap(mode, "second")) + .setParallelism(4) + .uid("second"); + } + + public static SingleOutputStreamOperator<Integer> createThirdStatefulMap(ExecutionMode mode, DataStream<Integer> input) { + SingleOutputStreamOperator<Integer> map = input + .map(new StatefulStringStoringMap(mode, "third")) + .setParallelism(4); + + // we cannot set the uid on a chained operator in 1.2 + if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) { + map.uid("third"); + } + + return map; + } + + public static SingleOutputStreamOperator<Integer> createStatelessMap(DataStream<Integer> input) { + return input.map(new NoOpMapFunction()) + .setParallelism(4); + } + + private static class StatefulStringStoringMap extends RichMapFunction<Integer, Integer> implements ListCheckpointed<String> { + + private static final long serialVersionUID = 6092985758425330235L; + private final ExecutionMode mode; + private final String valueToStore; + + private StatefulStringStoringMap(ExecutionMode mode, String valueToStore) { + this.mode = mode; + this.valueToStore = valueToStore; + } + + @Override + public Integer map(Integer value) throws Exception { + return value; + } + + @Override + public List<String> snapshotState(long checkpointId, long timestamp) throws Exception { + return Arrays.asList(valueToStore + getRuntimeContext().getIndexOfThisSubtask()); + } + + @Override + public void restoreState(List<String> state) throws Exception { + switch (mode) { + case GENERATE: + break; + case MIGRATE: + case RESTORE: + Assert.assertEquals("Failed for " + valueToStore + getRuntimeContext().getIndexOfThisSubtask(), 1, state.size()); + String value = state.get(0); + Assert.assertEquals(valueToStore + getRuntimeContext().getIndexOfThisSubtask(), value); + } + } + } + + private static class NoOpMapFunction implements MapFunction<Integer, Integer> { + + private static final long serialVersionUID = 6584823409744624276L; + + @Override + public Integer map(Integer value) throws Exception { + return value; + } + } + + private static final class IntegerSource extends RichParallelSourceFunction<Integer> { + + private static final long serialVersionUID = 1912878510707871659L; + private final ExecutionMode mode; + + private volatile boolean running = true; + + private IntegerSource(ExecutionMode mode) { + this.mode = mode; + } + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + ctx.collect(1); + + switch (mode) { + case GENERATE: + case MIGRATE: + // keep the job running until cancel-with-savepoint was done + synchronized (this) { + while (running) { + this.wait(); + } + } + } + } + + @Override + public void cancel() { + synchronized (this) { + running = false; + this.notifyAll(); + } + } + } + + private NonKeyedJob() { + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata b/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata new file mode 100644 index 0000000..9e03876 Binary files /dev/null and b/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata differ http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata b/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata new file mode 100644 index 0000000..8fcd1ea Binary files /dev/null and b/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata differ