This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new c3a9293 [FLINK-10712] Support state restore for RestartPipelinedRegionStrategy c3a9293 is described below commit c3a929346f9993d5177978053fc0bedd482faf7c Author: Yun Tang <myas...@live.com> AuthorDate: Wed Apr 17 19:48:40 2019 +0800 [FLINK-10712] Support state restore for RestartPipelinedRegionStrategy This closes #7813. --- .../fs/RollingSinkFaultToleranceITCase.java | 12 +- .../BucketingSinkFaultToleranceITCase.java | 12 +- .../runtime/checkpoint/CheckpointCoordinator.java | 15 +- .../flink/runtime/executiongraph/Execution.java | 2 - .../executiongraph/failover/FailoverRegion.java | 28 +- .../failover/RestartPipelinedRegionStrategy.java | 18 +- ...ncurrentFailoverStrategyExecutionGraphTest.java | 4 +- .../runtime/executiongraph/FailoverRegionTest.java | 219 +++++++++++- .../ContinuousFileProcessingCheckpointITCase.java | 23 +- .../test/checkpointing/RegionFailoverITCase.java | 394 +++++++++++++++++++++ .../StreamFaultToleranceTestBase.java | 87 ++++- 11 files changed, 748 insertions(+), 66 deletions(-) diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java index 886055d..daa8884 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java @@ -32,9 +32,9 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.AfterClass; +import org.junit.After; import org.junit.Assert; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.ClassRule; import org.junit.rules.TemporaryFolder; @@ -77,8 +77,8 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas private static String outPath; - @BeforeClass - public static void createHDFS() throws IOException { + @Before + public void createHDFS() throws IOException { Configuration conf = new Configuration(); File dataDir = tempFolder.newFolder(); @@ -94,8 +94,8 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas + "/string-non-rolling-out"; } - @AfterClass - public static void destroyHDFS() { + @After + public void destroyHDFS() { if (hdfsCluster != null) { hdfsCluster.shutdown(); } diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java index dcb77bf..71b210d 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java @@ -32,9 +32,9 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.AfterClass; +import org.junit.After; import org.junit.Assert; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.ClassRule; import org.junit.rules.TemporaryFolder; @@ -77,8 +77,8 @@ public class BucketingSinkFaultToleranceITCase extends StreamFaultToleranceTestB private static String outPath; - @BeforeClass - public static void createHDFS() throws IOException { + @Before + public void createHDFS() throws IOException { Configuration conf = new Configuration(); File dataDir = tempFolder.newFolder(); @@ -94,8 +94,8 @@ public class BucketingSinkFaultToleranceITCase extends StreamFaultToleranceTestB + "/string-non-rolling-out"; } - @AfterClass - public static void destroyHDFS() { + @After + public void destroyHDFS() { if (hdfsCluster != null) { hdfsCluster.shutdown(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index ac4c1e2..b6cbc66 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -1202,12 +1202,23 @@ public class CheckpointCoordinator { currentPeriodicTrigger = null; } + abortPendingCheckpoints(new Exception("Checkpoint Coordinator is suspending.")); + + numUnsuccessfulCheckpointsTriggers.set(0); + } + } + + /** + * Aborts all the pending checkpoints due to en exception. + * @param exception The exception. + */ + public void abortPendingCheckpoints(Exception exception) { + synchronized (lock) { for (PendingCheckpoint p : pendingCheckpoints.values()) { - p.abortError(new Exception("Checkpoint Coordinator is suspending.")); + p.abortError(exception); } pendingCheckpoints.clear(); - numUnsuccessfulCheckpointsTriggers.set(0); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index dd8db3a..59f79a2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -89,7 +89,6 @@ import static org.apache.flink.runtime.execution.ExecutionState.FINISHED; import static org.apache.flink.runtime.execution.ExecutionState.RUNNING; import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** * A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times @@ -365,7 +364,6 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution * @param taskRestore information to restore the state */ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { - checkState(state == CREATED, "Can only assign operator state when execution attempt is in CREATED"); this.taskRestore = taskRestore; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java index 6235750..635a7f5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java @@ -23,9 +23,11 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch; import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; import org.apache.flink.util.AbstractID; @@ -38,6 +40,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -64,15 +67,19 @@ public class FailoverRegion { private final List<ExecutionVertex> connectedExecutionVertexes; + private final Map<JobVertexID, ExecutionJobVertex> tasks; + /** Current status of the job execution */ private volatile JobStatus state = JobStatus.RUNNING; public FailoverRegion( ExecutionGraph executionGraph, - List<ExecutionVertex> connectedExecutions) { + List<ExecutionVertex> connectedExecutions, + Map<JobVertexID, ExecutionJobVertex> tasks) { this.executionGraph = checkNotNull(executionGraph); this.connectedExecutionVertexes = checkNotNull(connectedExecutions); + this.tasks = checkNotNull(tasks); LOG.debug("Created failover region {} with vertices: {}", id, connectedExecutions); } @@ -108,14 +115,7 @@ public class FailoverRegion { return state; } - /** - * get all execution vertexes contained in this region - */ - public List<ExecutionVertex> getAllExecutionVertexes() { - return connectedExecutionVertexes; - } - - // Notice the region to failover, + // Notice the region to failover, private void failover(long globalModVersionOfFailover) { if (!executionGraph.getRestartStrategy().canRestart()) { executionGraph.failGlobal(new FlinkException("RestartStrategy validate fail")); @@ -206,13 +206,15 @@ public class FailoverRegion { try { if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) { // if we have checkpointed state, reload it into the executions - //TODO: checkpoint support restore part ExecutionVertex cp - /** if (executionGraph.getCheckpointCoordinator() != null) { + // we restart the checkpoint scheduler for + // i) enable new checkpoint could be triggered without waiting for last checkpoint expired. + // ii) ensure the EXACTLY_ONCE semantics if needed. + executionGraph.getCheckpointCoordinator().abortPendingCheckpoints(new Exception("FailoverRegion is restarting.")); + executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState( - connectedExecutionVertexes, false, false); + tasks, false, true); } - */ HashSet<AllocationID> previousAllocationsInRegion = new HashSet<>(connectedExecutionVertexes.size()); for (ExecutionVertex connectedExecutionVertex : connectedExecutionVertexes) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java index 25fd290..811c0a0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkRuntimeException; @@ -36,11 +37,12 @@ import java.util.Arrays; import java.util.HashMap; import java.util.IdentityHashMap; import java.util.List; +import java.util.Map; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A failover strategy that restarts regions of the ExecutionGraph. A region is defined + * A failover strategy that restarts regions of the ExecutionGraph with state. A region is defined * by this strategy as the weakly connected component of tasks that communicate via pipelined * data exchange. */ @@ -222,7 +224,19 @@ public class RestartPipelinedRegionStrategy extends FailoverStrategy { @VisibleForTesting protected FailoverRegion createFailoverRegion(ExecutionGraph eg, List<ExecutionVertex> connectedExecutions) { - return new FailoverRegion(eg, connectedExecutions); + Map<JobVertexID, ExecutionJobVertex> tasks = initTasks(connectedExecutions); + return new FailoverRegion(eg, connectedExecutions, tasks); + } + + @VisibleForTesting + protected Map<JobVertexID, ExecutionJobVertex> initTasks(List<ExecutionVertex> connectedExecutions) { + Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>(connectedExecutions.size()); + for (ExecutionVertex executionVertex : connectedExecutions) { + JobVertexID jobvertexId = executionVertex.getJobvertexId(); + ExecutionJobVertex jobVertex = executionVertex.getJobVertex(); + tasks.putIfAbsent(jobvertexId, jobVertex); + } + return tasks; } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java index 697b774..9ac72a2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; @@ -521,7 +522,8 @@ public class ConcurrentFailoverStrategyExecutionGraphTest extends TestLogger { @Override protected FailoverRegion createFailoverRegion(ExecutionGraph eg, List<ExecutionVertex> connectedExecutions) { - return new FailoverRegion(eg, connectedExecutions) { + Map<JobVertexID, ExecutionJobVertex> tasks = initTasks(connectedExecutions); + return new FailoverRegion(eg, connectedExecutions, tasks) { @Override protected CompletableFuture<Void> createTerminationFutureOverAllConnectedVertexes() { ArrayList<CompletableFuture<?>> terminationAndBlocker = new ArrayList<>(2); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java index 240d236..e555704 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java @@ -19,8 +19,25 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.mock.Whitebox; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointProperties; +import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; +import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.PendingCheckpoint; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; @@ -28,54 +45,85 @@ import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionSt import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.TestLogger; import org.junit.Ignore; import org.junit.Test; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionState; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilFailoverRegionState; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; public class FailoverRegionTest extends TestLogger { + private static final long checkpointId = 42L; + /** - * Tests that a job only has one failover region and can recover from task failure successfully - * @throws Exception + * Tests that a job only has one failover region and can recover from task failure successfully with state. + * @throws Exception if fail to create the single region execution graph or fail to acknowledge all checkpoints. */ @Test public void testSingleRegionFailover() throws Exception { RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy(10); ExecutionGraph eg = createSingleRegionExecutionGraph(restartStrategy); - RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy(); + RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy) eg.getFailoverStrategy(); ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next(); + assertNotNull(eg.getCheckpointCoordinator()); + assertFalse(eg.getCheckpointCoordinator().getPendingCheckpoints().isEmpty()); + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev).getState()); + acknowledgeAllCheckpoints(eg.getCheckpointCoordinator(), eg.getAllExecutionVertices().iterator()); + + // verify checkpoint has been completed successfully. + assertEquals(1, eg.getCheckpointCoordinator().getCheckpointStore().getNumberOfRetainedCheckpoints()); + assertEquals(checkpointId, eg.getCheckpointCoordinator().getCheckpointStore().getLatestCheckpoint().getCheckpointID()); + ev.getCurrentExecutionAttempt().fail(new Exception("Test Exception")); assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev).getState()); for (ExecutionVertex evs : eg.getAllExecutionVertices()) { evs.getCurrentExecutionAttempt().completeCancelling(); } + + verifyCheckpointRestoredAsExpected(eg); + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev).getState()); } @@ -98,8 +146,9 @@ public class FailoverRegionTest extends TestLogger { final JobID jobId = new JobID(); final String jobName = "Test Job Sample Name"; - final SlotProvider slotProvider = new SimpleSlotProvider(jobId, 20); - + final Map<ExecutionAttemptID, JobManagerTaskRestore> attemptIDInitStateMap = new HashMap<>(); + final SlotProvider slotProvider = new SimpleSlotProvider(jobId, 20, new CollectTddTaskManagerGateway(attemptIDInitStateMap)); + JobVertex v1 = new JobVertex("vertex1"); JobVertex v2 = new JobVertex("vertex2"); JobVertex v3 = new JobVertex("vertex3"); @@ -133,9 +182,8 @@ public class FailoverRegionTest extends TestLogger { slotProvider); eg.attachJobGraph(ordered); - eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread()); - RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy(); + RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy) eg.getFailoverStrategy(); // the following two vertices are in the same failover region ExecutionVertex ev11 = eg.getJobVertex(v1.getID()).getTaskVertices()[0]; @@ -148,11 +196,21 @@ public class FailoverRegionTest extends TestLogger { // the following vertices are in one failover region ExecutionVertex ev31 = eg.getJobVertex(v3.getID()).getTaskVertices()[0]; ExecutionVertex ev32 = eg.getJobVertex(v3.getID()).getTaskVertices()[1]; - ExecutionVertex ev4 = eg.getJobVertex(v3.getID()).getTaskVertices()[0]; + ExecutionVertex ev4 = eg.getJobVertex(v4.getID()).getTaskVertices()[0]; + + enableCheckpointing(eg); + eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread()); eg.scheduleForExecution(); + assertEquals(JobStatus.RUNNING, eg.getState()); + + attachPendingCheckpoints(eg); assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState()); + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev21).getState()); + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev31).getState()); + + acknowledgeAllCheckpoints(eg.getCheckpointCoordinator(), Arrays.asList(ev11, ev21, ev12, ev22, ev31, ev32, ev4).iterator()); ev21.scheduleForExecution(slotProvider, true, LocationPreferenceConstraint.ALL, Collections.emptySet()); ev21.getCurrentExecutionAttempt().fail(new Exception("New fail")); @@ -161,6 +219,8 @@ public class FailoverRegionTest extends TestLogger { assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev31).getState()); ev11.getCurrentExecutionAttempt().completeCancelling(); + verifyCheckpointRestoredAsExpected(eg); + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState()); assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev22).getState()); assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev31).getState()); @@ -182,14 +242,18 @@ public class FailoverRegionTest extends TestLogger { assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev31).getState()); ev32.getCurrentExecutionAttempt().completeCancelling(); + verifyCheckpointRestoredAsExpected(eg); + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState()); assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev22).getState()); assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev31).getState()); + + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev4).getState()); } /** - * Tests that when a task fail, and restart strategy doesn't support restarting, the job will go to failed - * @throws Exception + * Tests that when a task fail, and restart strategy doesn't support restarting, the job will go to failed. + * @throws Exception if fail to create the single region execution graph. */ @Test public void testNoManualRestart() throws Exception { @@ -207,8 +271,9 @@ public class FailoverRegionTest extends TestLogger { } /** - * Tests that two failover regions failover at the same time, they will not influence each other - * @throws Exception + * Tests that two regions failover at the same time, they will not influence each other. + * @throws Exception if fail to create dummy job information, fail to schedule for execution + * or timeout before region switches to expected status. */ @Test public void testMultiRegionFailoverAtSameTime() throws Exception { @@ -282,7 +347,7 @@ public class FailoverRegionTest extends TestLogger { * Tests that if a task reports the result of its preceding task is failed, * its preceding task will be considered as failed, and start to failover * TODO: as the report part is not finished yet, this case is ignored temporarily - * @throws Exception + * @throws Exception if fail to create dummy job information or fail to schedule for execution. */ @Ignore @Test @@ -335,8 +400,8 @@ public class FailoverRegionTest extends TestLogger { } /** - * Tests that a new failure comes while the failover region is in CANCELLING - * @throws Exception + * Tests that a new failure comes while the failover region is in CANCELLING. + * @throws Exception if fail to create the single region execution graph. */ @Test public void testFailWhileCancelling() throws Exception { @@ -359,8 +424,8 @@ public class FailoverRegionTest extends TestLogger { } /** - * Tests that a new failure comes while the failover region is restarting - * @throws Exception + * Tests that a new failure comes while the failover region is restarting. + * @throws Exception if fail to create the single region execution graph. */ @Test public void testFailWhileRestarting() throws Exception { @@ -384,7 +449,19 @@ public class FailoverRegionTest extends TestLogger { assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev1).getState()); } - private static ExecutionGraph createSingleRegionExecutionGraph(RestartStrategy restartStrategy) throws Exception { + // -------------------------------------------------------------------------------------------- + + private void verifyCheckpointRestoredAsExpected(ExecutionGraph eg) throws Exception { + // pending checkpoints have already been cancelled. + assertNotNull(eg.getCheckpointCoordinator()); + assertTrue(eg.getCheckpointCoordinator().getPendingCheckpoints().isEmpty()); + + // verify checkpoint has been restored successfully. + assertEquals(1, eg.getCheckpointCoordinator().getCheckpointStore().getNumberOfRetainedCheckpoints()); + assertEquals(checkpointId, eg.getCheckpointCoordinator().getCheckpointStore().getLatestCheckpoint().getCheckpointID()); + } + + private ExecutionGraph createSingleRegionExecutionGraph(RestartStrategy restartStrategy) throws Exception { final JobID jobId = new JobID(); final String jobName = "Test Job Sample Name"; @@ -425,8 +502,13 @@ public class FailoverRegionTest extends TestLogger { e.printStackTrace(); fail("Job failed with exception: " + e.getMessage()); } + + enableCheckpointing(eg); + eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread()); eg.scheduleForExecution(); + + attachPendingCheckpoints(eg); return eg; } @@ -444,4 +526,105 @@ public class FailoverRegionTest extends TestLogger { } } + private static void enableCheckpointing(ExecutionGraph eg) { + ArrayList<ExecutionJobVertex> jobVertices = new ArrayList<>(eg.getAllVertices().values()); + eg.enableCheckpointing( + 1000, + 100, + 0, + 1, + CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION, + jobVertices, + jobVertices, + jobVertices, + Collections.emptyList(), + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + new MemoryStateBackend(), + new CheckpointStatsTracker( + 0, + jobVertices, + mock(CheckpointCoordinatorConfiguration.class), + new UnregisteredMetricsGroup())); + } + + /** + * Attach pending checkpoints of chk-42 and chk-43 to the execution graph. + * If {@link #acknowledgeAllCheckpoints(CheckpointCoordinator, Iterator)} called then, + * chk-42 would become the completed checkpoint. + */ + private void attachPendingCheckpoints(ExecutionGraph eg) throws IOException { + final Map<Long, PendingCheckpoint> pendingCheckpoints = new HashMap<>(); + final Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm = new HashMap<>(); + eg.getAllExecutionVertices().forEach(e -> { + Execution ee = e.getCurrentExecutionAttempt(); + if (ee != null) { + verticesToConfirm.put(ee.getAttemptId(), e); + } + }); + + CheckpointCoordinator checkpointCoordinator = eg.getCheckpointCoordinator(); + assertNotNull(checkpointCoordinator); + CheckpointStorageCoordinatorView checkpointStorage = checkpointCoordinator.getCheckpointStorage(); + pendingCheckpoints.put(checkpointId, new PendingCheckpoint( + eg.getJobID(), + checkpointId, + 0L, + verticesToConfirm, + CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE), + checkpointStorage.initializeLocationForCheckpoint(checkpointId), + eg.getFutureExecutor())); + + long newCheckpointId = checkpointId + 1; + pendingCheckpoints.put(newCheckpointId, new PendingCheckpoint( + eg.getJobID(), + newCheckpointId, + 0L, + verticesToConfirm, + CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE), + checkpointStorage.initializeLocationForCheckpoint(newCheckpointId), + eg.getFutureExecutor())); + Whitebox.setInternalState(checkpointCoordinator, "pendingCheckpoints", pendingCheckpoints); + } + + /** + * Let the checkpoint coordinator to receive all acknowledges from given executionVertexes so that to complete the expected checkpoint. + */ + private void acknowledgeAllCheckpoints(CheckpointCoordinator checkpointCoordinator, Iterator<ExecutionVertex> executionVertexes) throws IOException, CheckpointException { + while (executionVertexes.hasNext()) { + ExecutionVertex executionVertex = executionVertexes.next(); + for (int index = 0; index < executionVertex.getJobVertex().getParallelism(); index++) { + JobVertexID jobVertexID = executionVertex.getJobvertexId(); + OperatorStateHandle opStateBackend = CheckpointCoordinatorTest.generatePartitionableStateHandle(jobVertexID, index, 2, 8, false); + OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState(opStateBackend, null, null, null); + TaskStateSnapshot taskOperatorSubtaskStates = new TaskStateSnapshot(); + taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID), operatorSubtaskState); + + AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint( + executionVertex.getJobId(), + executionVertex.getJobVertex().getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), + checkpointId, + new CheckpointMetrics(), + taskOperatorSubtaskStates); + + checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint); + } + } + } + + private static class CollectTddTaskManagerGateway extends SimpleAckingTaskManagerGateway { + + private final Map<ExecutionAttemptID, JobManagerTaskRestore> attemptIDInitStateMap; + + CollectTddTaskManagerGateway(Map<ExecutionAttemptID, JobManagerTaskRestore> attemptIDInitStateMap) { + this.attemptIDInitStateMap = attemptIDInitStateMap; + } + + @Override + public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) { + attemptIDInitStateMap.put(tdd.getExecutionAttemptId(), tdd.getTaskRestore()); + return super.submitTask(tdd, timeout); + } + } + } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java index 280e11a..cbe99f4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java @@ -38,9 +38,10 @@ import org.apache.flink.util.Collector; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; -import org.junit.AfterClass; +import org.junit.After; import org.junit.Assert; -import org.junit.BeforeClass; +import org.junit.AssumptionViolatedException; +import org.junit.Before; import java.io.File; import java.io.IOException; @@ -74,8 +75,14 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran private static Map<Integer, Set<String>> actualCollectedContent = new HashMap<>(); - @BeforeClass - public static void createHDFS() { + @Before + public void createHDFS() { + if (failoverStrategy.equals(FailoverStrategy.RestartPipelinedRegionStrategy)) { + // TODO the 'NO_OF_RETRIES' is useless for current RestartPipelinedRegionStrategy, + // for this ContinuousFileProcessingCheckpointITCase, using RestartPipelinedRegionStrategy would result in endless running. + throw new AssumptionViolatedException("ignored ContinuousFileProcessingCheckpointITCase when using RestartPipelinedRegionStrategy"); + } + try { baseDir = new File("./target/localfs/fs_tests").getAbsoluteFile(); FileUtil.fullyDelete(baseDir); @@ -91,10 +98,12 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran } } - @AfterClass - public static void destroyHDFS() { + @After + public void destroyHDFS() { try { - FileUtil.fullyDelete(baseDir); + if (baseDir != null) { + FileUtil.fullyDelete(baseDir); + } } catch (Throwable t) { throw new RuntimeException(t); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java new file mode 100644 index 0000000..b951b48 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; +import org.apache.flink.streaming.api.datastream.DataStreamUtils; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.TestLogger; + +import org.apache.commons.collections.CollectionUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.StreamSupport; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for region failover with multi regions. + */ +public class RegionFailoverITCase extends TestLogger { + + private static final int FAIL_BASE = 1000; + private static final int NUM_OF_REGIONS = 3; + private static final int MAX_PARALLELISM = 2 * NUM_OF_REGIONS; + private static final Set<Integer> EXPECTED_INDICES = IntStream.range(0, NUM_OF_REGIONS).boxed().collect(Collectors.toSet()); + private static final int NUM_OF_RESTARTS = 3; + private static final int NUM_ELEMENTS = FAIL_BASE * 10; + + private static AtomicLong lastCompletedCheckpointId = new AtomicLong(0); + private static AtomicInteger numCompletedCheckpoints = new AtomicInteger(0); + private static AtomicInteger jobFailedCnt = new AtomicInteger(0); + + private static Map<Long, Integer> snapshotIndicesOfSubTask = new HashMap<>(); + + private static MiniClusterWithClientResource cluster; + + private static boolean restoredState = false; + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Before + public void setup() throws Exception { + Configuration configuration = new Configuration(); + configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "region"); + + cluster = new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(configuration) + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(2).build()); + cluster.before(); + jobFailedCnt.set(0); + numCompletedCheckpoints.set(0); + } + + @AfterClass + public static void shutDownExistingCluster() { + if (cluster != null) { + cluster.after(); + cluster = null; + } + } + + /** + * Tests that a simple job (Source -> Map) with multi regions could restore with operator state. + * + * <p>The last subtask of Map function in the 1st stream graph would fail {@code NUM_OF_RESTARTS} times, + * and it will verify whether the restored state is identical to last completed checkpoint's. + */ + @Test(timeout = 60000) + public void testMultiRegionFailover() { + try { + JobGraph jobGraph = createJobGraph(); + ClusterClient<?> client = cluster.getClusterClient(); + client.submitJob(jobGraph, RegionFailoverITCase.class.getClassLoader()); + verifyAfterJobExecuted(); + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + private void verifyAfterJobExecuted() { + Assert.assertTrue("The test multi-region job has never ever restored state.", restoredState); + + int keyCount = 0; + for (Map<Integer, Integer> map : ValidatingSink.maps) { + for (Map.Entry<Integer, Integer> entry : map.entrySet()) { + assertEquals(4 * entry.getKey() + 1, (int) entry.getValue()); + keyCount += 1; + } + } + assertEquals(NUM_ELEMENTS / 2, keyCount); + } + + private JobGraph createJobGraph() { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(NUM_OF_REGIONS); + env.setMaxParallelism(MAX_PARALLELISM); + env.enableCheckpointing(200, CheckpointingMode.EXACTLY_ONCE); + env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + env.disableOperatorChaining(); + env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(NUM_OF_RESTARTS, 0L)); + env.getConfig().disableSysoutLogging(); + + // Use DataStreamUtils#reinterpretAsKeyed to avoid merge regions and this stream graph would exist num of 'NUM_OF_REGIONS' individual regions. + DataStreamUtils.reinterpretAsKeyedStream( + env.addSource(new StringGeneratingSourceFunction(NUM_ELEMENTS, NUM_ELEMENTS / NUM_OF_RESTARTS)) + .setParallelism(NUM_OF_REGIONS), + (KeySelector<Tuple2<Integer, Integer>, Integer>) value -> value.f0, + TypeInformation.of(Integer.class)) + .map(new FailingMapperFunction(NUM_OF_RESTARTS)) + .setParallelism(NUM_OF_REGIONS) + .addSink(new ValidatingSink()) + .setParallelism(NUM_OF_REGIONS); + + // another stream graph totally disconnected with the above one. + env.addSource(new StringGeneratingSourceFunction(NUM_ELEMENTS, NUM_ELEMENTS / NUM_OF_RESTARTS)).setParallelism(1) + .map((MapFunction<Tuple2<Integer, Integer>, Object>) value -> value).setParallelism(1); + + return env.getStreamGraph().getJobGraph(); + } + + private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<Tuple2<Integer, Integer>> + implements CheckpointListener, CheckpointedFunction { + private static final long serialVersionUID = 1L; + + private final long numElements; + private final long checkpointLatestAt; + + private int index = -1; + + private int lastRegionIndex = -1; + + private volatile boolean isRunning = true; + + private ListState<Integer> listState; + + private static final ListStateDescriptor<Integer> stateDescriptor = new ListStateDescriptor<>("list-1", Integer.class); + + private ListState<Integer> unionListState; + + private static final ListStateDescriptor<Integer> unionStateDescriptor = new ListStateDescriptor<>("list-2", Integer.class); + + StringGeneratingSourceFunction(long numElements, long checkpointLatestAt) { + this.numElements = numElements; + this.checkpointLatestAt = checkpointLatestAt; + } + + @Override + public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { + if (index < 0) { + // not been restored, so initialize + index = 0; + } + + int subTaskIndex = getRuntimeContext().getIndexOfThisSubtask(); + while (isRunning && index < numElements) { + + synchronized (ctx.getCheckpointLock()) { + int key = index / 2; + int forwardTaskIndex = KeyGroupRangeAssignment.assignKeyToParallelOperator(key, MAX_PARALLELISM, NUM_OF_REGIONS); + // pre-partition output keys + if (forwardTaskIndex == subTaskIndex) { + // we would send data with the same key twice. + ctx.collect(Tuple2.of(key, index)); + } + index += 1; + } + + if (numCompletedCheckpoints.get() < 3) { + // not yet completed enough checkpoints, so slow down + if (index < checkpointLatestAt) { + // mild slow down + Thread.sleep(1); + } else { + // wait until the checkpoints are completed + while (isRunning && numCompletedCheckpoints.get() < 3) { + Thread.sleep(300); + } + } + } + if (jobFailedCnt.get() < NUM_OF_RESTARTS) { + // slow down if job has not failed for 'NUM_OF_RESTARTS' times. + Thread.sleep(1); + } + } + } + + @Override + public void cancel() { + isRunning = false; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + if (getRuntimeContext().getIndexOfThisSubtask() == NUM_OF_REGIONS - 1) { + lastCompletedCheckpointId.set(checkpointId); + snapshotIndicesOfSubTask.put(checkpointId, lastRegionIndex); + numCompletedCheckpoints.incrementAndGet(); + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); + if (indexOfThisSubtask != 0) { + listState.clear(); + listState.add(index); + if (indexOfThisSubtask == NUM_OF_REGIONS - 1) { + lastRegionIndex = index; + } + } + unionListState.clear(); + unionListState.add(indexOfThisSubtask); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); + if (context.isRestored()) { + restoredState = true; + + unionListState = context.getOperatorStateStore().getUnionListState(unionStateDescriptor); + Set<Integer> actualIndices = StreamSupport.stream(unionListState.get().spliterator(), false).collect(Collectors.toSet()); + Assert.assertTrue(CollectionUtils.isEqualCollection(EXPECTED_INDICES, actualIndices)); + + if (indexOfThisSubtask == 0) { + listState = context.getOperatorStateStore().getListState(stateDescriptor); + Assert.assertTrue("list state should be empty for subtask-0", + ((List<Integer>) listState.get()).isEmpty()); + } else { + listState = context.getOperatorStateStore().getListState(stateDescriptor); + Assert.assertTrue("list state should not be empty for subtask-" + indexOfThisSubtask, + ((List<Integer>) listState.get()).size() > 0); + + if (indexOfThisSubtask == NUM_OF_REGIONS - 1) { + index = listState.get().iterator().next(); + if (index != snapshotIndicesOfSubTask.get(lastCompletedCheckpointId.get())) { + throw new RuntimeException("Test failed due to unexpected recovered index: " + index + + ", while last completed checkpoint record index: " + snapshotIndicesOfSubTask.get(lastCompletedCheckpointId.get())); + } + } + } + } else { + unionListState = context.getOperatorStateStore().getUnionListState(unionStateDescriptor); + + if (indexOfThisSubtask != 0) { + listState = context.getOperatorStateStore().getListState(stateDescriptor); + } + } + + } + } + + private static class FailingMapperFunction extends RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> { + private final int restartTimes; + private ValueState<Integer> valueState; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + valueState = getRuntimeContext().getState(new ValueStateDescriptor<>("value", Integer.class)); + } + + FailingMapperFunction(int restartTimes) { + this.restartTimes = restartTimes; + } + + @Override + public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> input) throws Exception { + int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); + + if (input.f1 > FAIL_BASE * (jobFailedCnt.get() + 1)) { + + // we would let region-0 to failover first + if (jobFailedCnt.get() < 1 && indexOfThisSubtask == 0) { + jobFailedCnt.incrementAndGet(); + throw new TestException(); + } + + // then let last region to failover + if (jobFailedCnt.get() < restartTimes && indexOfThisSubtask == NUM_OF_REGIONS - 1) { + jobFailedCnt.incrementAndGet(); + throw new TestException(); + } + } + + // take input (1, 2) and (1, 3) for example, we would finally emit (1, 5) out with the usage of keyed state. + Integer value = valueState.value(); + if (value == null) { + valueState.update(input.f1); + return input; + } else { + return Tuple2.of(input.f0, value + input.f1); + } + } + } + + private static class ValidatingSink extends RichSinkFunction<Tuple2<Integer, Integer>> + implements ListCheckpointed<HashMap<Integer, Integer>> { + + @SuppressWarnings("unchecked") + private static Map<Integer, Integer>[] maps = (Map<Integer, Integer>[]) new Map<?, ?>[NUM_OF_REGIONS]; + + private HashMap<Integer, Integer> counts = new HashMap<>(); + + @Override + public void invoke(Tuple2<Integer, Integer> input) { + counts.merge(input.f0, input.f1, Math::max); + } + + @Override + public void close() throws Exception { + maps[getRuntimeContext().getIndexOfThisSubtask()] = counts; + } + + @Override + public List<HashMap<Integer, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.counts); + } + + @Override + public void restoreState(List<HashMap<Integer, Integer>> state) throws Exception { + if (state.size() != 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.counts.putAll(state.get(0)); + } + } + + private static class TestException extends IOException{ + private static final long serialVersionUID = 1L; + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java index 0ace56f5..6fae020 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java @@ -19,33 +19,84 @@ package org.apache.flink.test.checkpointing; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.util.MiniClusterWithClientResource; -import org.apache.flink.test.util.TestUtils; +import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.TestLogger; +import org.junit.After; import org.junit.Assert; -import org.junit.ClassRule; +import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.Serializable; +import java.util.Arrays; +import java.util.Collection; + +import static org.junit.Assert.fail; /** * Test base for fault tolerant streaming programs. */ +@RunWith(Parameterized.class) public abstract class StreamFaultToleranceTestBase extends TestLogger { + @Parameterized.Parameters(name = "FailoverStrategy: {0}") + public static Collection<FailoverStrategy> parameters() { + return Arrays.asList(FailoverStrategy.RestartAllStrategy, FailoverStrategy.RestartPipelinedRegionStrategy); + } + + /** + * The failover strategy to use. + */ + public enum FailoverStrategy{ + RestartAllStrategy, + RestartPipelinedRegionStrategy + } + + @Parameterized.Parameter + public FailoverStrategy failoverStrategy; + protected static final int NUM_TASK_MANAGERS = 3; protected static final int NUM_TASK_SLOTS = 4; protected static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS; - @ClassRule - public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource( - new MiniClusterResourceConfiguration.Builder() - .setNumberTaskManagers(NUM_TASK_MANAGERS) - .setNumberSlotsPerTaskManager(NUM_TASK_SLOTS) - .build()); + private static MiniClusterWithClientResource cluster; + + @Before + public void setup() throws Exception { + Configuration configuration = new Configuration(); + switch (failoverStrategy) { + case RestartPipelinedRegionStrategy: + configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "region"); + break; + case RestartAllStrategy: + configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "full"); + } + + cluster = new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(configuration) + .setNumberTaskManagers(NUM_TASK_MANAGERS) + .setNumberSlotsPerTaskManager(NUM_TASK_SLOTS) + .build()); + cluster.before(); + } + + @After + public void shutDownExistingCluster() { + if (cluster != null) { + cluster.after(); + cluster = null; + } + } /** * Implementations are expected to assemble the test topology in this function @@ -73,7 +124,25 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger { testProgram(env); - TestUtils.tryExecute(env, "Fault Tolerance Test"); + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + try { + cluster.getClusterClient().submitJob(jobGraph, getClass().getClassLoader()).getJobExecutionResult(); + } + catch (ProgramInvocationException root) { + Throwable cause = root.getCause(); + + // search for nested SuccessExceptions + int depth = 0; + while (!(cause instanceof SuccessException)) { + if (cause == null || depth++ == 20) { + root.printStackTrace(); + fail("Test failed: " + root.getMessage()); + } + else { + cause = cause.getCause(); + } + } + } postSubmit(); }