[tests] Add test for restart recovery
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/785f2041 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/785f2041 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/785f2041 Branch: refs/heads/release-0.8 Commit: 785f2041fdf8f8409e7cfcc66451b79d7bf57673 Parents: 4ff3f4c Author: Stephan Ewen <[email protected]> Authored: Mon Feb 16 21:40:06 2015 +0100 Committer: Stephan Ewen <[email protected]> Committed: Wed Mar 4 17:49:59 2015 +0100 ---------------------------------------------------------------------- .../client/minicluster/NepheleMiniCluster.java | 40 ++- .../client/program/AutoParallelismITCase.java | 118 -------- .../flink/api/java/ExecutionEnvironment.java | 3 + .../runtime/executiongraph/ExecutionGraph.java | 2 +- .../test/recovery/AutoParallelismITCase.java | 122 ++++++++ .../test/recovery/SimpleRecoveryITCase.java | 287 +++++++++++++++++++ 6 files changed, 448 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/785f2041/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java index a40b733..79099c7 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java +++ b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java @@ -82,6 +82,9 @@ public class NepheleMiniCluster { private boolean defaultAlwaysCreateDirectory = false; + private long heartbeatInterval = ConfigConstants.DEFAULT_TASK_MANAGER_HEARTBEAT_INTERVAL; + + private long heartbeatTimeout = ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT; private JobManager jobManager; @@ -163,11 +166,33 @@ public class NepheleMiniCluster { public void setNumTaskManager(int numTaskManager) { this.numTaskManager = numTaskManager; } - public int getNumTaskManager() { return numTaskManager; } + public int getNumTaskManager() { + return numTaskManager; + } - public void setTaskManagerNumSlots(int taskManagerNumSlots) { this.taskManagerNumSlots = taskManagerNumSlots; } + public void setTaskManagerNumSlots(int taskManagerNumSlots) { + this.taskManagerNumSlots = taskManagerNumSlots; + } - public int getTaskManagerNumSlots() { return taskManagerNumSlots; } + public int getTaskManagerNumSlots() { + return taskManagerNumSlots; + } + + public void setHeartbeatInterval(long heartbeatInterval) { + this.heartbeatInterval = heartbeatInterval; + } + + public long getHeartbeatInterval() { + return heartbeatInterval; + } + + public void setHeartbeatTimeout(long heartbeatTimeout) { + this.heartbeatTimeout = heartbeatTimeout; + } + + public long getHeartbeatTimeout() { + return heartbeatTimeout; + } // ------------------------------------------------------------------------ // Life cycle and Job Submission @@ -206,7 +231,8 @@ public class NepheleMiniCluster { } else { Configuration conf = getMiniclusterDefaultConfig(jobManagerRpcPort, taskManagerRpcPort, taskManagerDataPort, memorySize, hdfsConfigFile, lazyMemoryAllocation, defaultOverwriteFiles, - defaultAlwaysCreateDirectory, taskManagerNumSlots, numTaskManager); + defaultAlwaysCreateDirectory, taskManagerNumSlots, numTaskManager, + heartbeatInterval, heartbeatTimeout); GlobalConfiguration.includeConfiguration(conf); } @@ -297,7 +323,8 @@ public class NepheleMiniCluster { public static Configuration getMiniclusterDefaultConfig(int jobManagerRpcPort, int taskManagerRpcPort, int taskManagerDataPort, long memorySize, String hdfsConfigFile, boolean lazyMemory, boolean defaultOverwriteFiles, boolean defaultAlwaysCreateDirectory, - int taskManagerNumSlots, int numTaskManager) + int taskManagerNumSlots, int numTaskManager, + long heartbeatInterval, long heartbeatTimeout) { final Configuration config = new Configuration(); @@ -350,6 +377,9 @@ public class NepheleMiniCluster { config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManager); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots); + + config.setLong(ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY, heartbeatInterval); + config.setLong(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, heartbeatTimeout); return config; } http://git-wip-us.apache.org/repos/asf/flink/blob/785f2041/flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java b/flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java deleted file mode 100644 index c1fa888..0000000 --- a/flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.client.program; - -import static org.junit.Assert.*; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.RichMapPartitionFunction; -import org.apache.flink.api.common.io.GenericInputFormat; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.LocalCollectionOutputFormat; -import org.apache.flink.client.minicluster.NepheleMiniCluster; -import org.apache.flink.core.io.GenericInputSplit; -import org.apache.flink.util.Collector; -import org.junit.Test; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -/** - * This test verifies that the auto parallelism is properly forwarded to the runtime. - */ -public class AutoParallelismITCase { - - private static final int NUM_TM = 2; - private static final int SLOTS_PER_TM = 7; - private static final int PARALLELISM = NUM_TM * SLOTS_PER_TM; - - @Test - public void testProgramWithAutoParallelism() { - - NepheleMiniCluster cluster = new NepheleMiniCluster(); - cluster.setNumTaskManager(NUM_TM); - cluster.setTaskManagerNumSlots(SLOTS_PER_TM); - - try { - cluster.start(); - - ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRpcPort()); - env.setDegreeOfParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX); - - DataSet<Integer> result = env - .createInput(new ParallelismDependentInputFormat()) - .mapPartition(new ParallelismDependentMapPartition()); - - List<Integer> resultCollection = new ArrayList<Integer>(); - result.output(new LocalCollectionOutputFormat<Integer>(resultCollection)); - - env.execute(); - - assertEquals(PARALLELISM, resultCollection.size()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { - try { - cluster.stop(); - } - catch (Throwable t) { - // ignore exceptions on shutdown - } - } - } - - private static class ParallelismDependentInputFormat extends GenericInputFormat<Integer> { - - private transient boolean emitted; - - @Override - public GenericInputSplit[] createInputSplits(int numSplits) throws IOException { - assertEquals(PARALLELISM, numSplits); - return super.createInputSplits(numSplits); - } - - @Override - public boolean reachedEnd() { - return emitted; - } - - @Override - public Integer nextRecord(Integer reuse) { - if (emitted) { - return null; - } - emitted = true; - return 1; - } - } - - private static class ParallelismDependentMapPartition extends RichMapPartitionFunction<Integer, Integer> { - - @Override - public void mapPartition(Iterable<Integer> values, Collector<Integer> out) { - out.collect(getRuntimeContext().getIndexOfThisSubtask()); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/785f2041/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 61a74b9..2026ace 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -813,6 +813,9 @@ public abstract class ExecutionEnvironment { if (getDegreeOfParallelism() > 0) { plan.setDefaultParallelism(getDegreeOfParallelism()); } + if (getNumberOfExecutionRetries() >= 0) { + plan.setNumberOfExecutionRetries(getNumberOfExecutionRetries()); + } try { registerCachedFilesWithPlan(plan); http://git-wip-us.apache.org/repos/asf/flink/blob/785f2041/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 1f9cf26..dfd2e39 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -735,7 +735,7 @@ public class ExecutionGraph { throw new IllegalStateException("Can only restart job from state restarting."); } if (scheduler == null) { - throw new IllegalStateException("The execution graph has not been schedudled before - scheduler is null."); + throw new IllegalStateException("The execution graph has not been scheduled before - scheduler is null."); } this.currentExecutions.clear(); http://git-wip-us.apache.org/repos/asf/flink/blob/785f2041/flink-tests/src/test/java/org/apache/flink/test/recovery/AutoParallelismITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AutoParallelismITCase.java new file mode 100644 index 0000000..bbfe0a9 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AutoParallelismITCase.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.recovery; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.RichMapPartitionFunction; +import org.apache.flink.api.common.io.GenericInputFormat; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.client.minicluster.NepheleMiniCluster; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.util.Collector; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * This test verifies that the auto parallelism is properly forwarded to the runtime. + */ +public class AutoParallelismITCase { + + private static final int NUM_TM = 2; + private static final int SLOTS_PER_TM = 7; + private static final int PARALLELISM = NUM_TM * SLOTS_PER_TM; + + @Test + public void testProgramWithAutoParallelism() { + + NepheleMiniCluster cluster = new NepheleMiniCluster(); + cluster.setNumTaskManager(NUM_TM); + cluster.setTaskManagerNumSlots(SLOTS_PER_TM); + + try { + cluster.start(); + + ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRpcPort()); + env.setDegreeOfParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX); + + DataSet<Integer> result = env + .createInput(new ParallelismDependentInputFormat()) + .rebalance() + .mapPartition(new ParallelismDependentMapPartition()); + + List<Integer> resultCollection = new ArrayList<Integer>(); + result.output(new LocalCollectionOutputFormat<Integer>(resultCollection)); + + env.execute(); + + assertEquals(PARALLELISM, resultCollection.size()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + try { + cluster.stop(); + } + catch (Throwable t) { + // ignore exceptions on shutdown + } + } + } + + // -------------------------------------------------------------------------------------------- + // Utility classes + // -------------------------------------------------------------------------------------------- + + private static class ParallelismDependentInputFormat extends GenericInputFormat<Integer> { + + private transient boolean emitted; + + @Override + public GenericInputSplit[] createInputSplits(int numSplits) throws IOException { + assertEquals(PARALLELISM, numSplits); + return super.createInputSplits(numSplits); + } + + @Override + public boolean reachedEnd() { + return emitted; + } + + @Override + public Integer nextRecord(Integer reuse) { + if (emitted) { + return null; + } + emitted = true; + return 1; + } + } + + private static class ParallelismDependentMapPartition extends RichMapPartitionFunction<Integer, Integer> { + + @Override + public void mapPartition(Iterable<Integer> values, Collector<Integer> out) { + out.collect(getRuntimeContext().getIndexOfThisSubtask()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/785f2041/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java new file mode 100644 index 0000000..bec9c3f --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.recovery; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.client.minicluster.NepheleMiniCluster; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.runtime.client.JobExecutionException; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.*; + +@SuppressWarnings("serial") +public class SimpleRecoveryITCase { + + private static NepheleMiniCluster cluster; + + @BeforeClass + public static void setupCluster() { + try { + cluster = new NepheleMiniCluster(); + cluster.setNumTaskManager(2); + cluster.setTaskManagerNumSlots(2); + + // these two parameters determine how fast the restart happens + cluster.setHeartbeatInterval(500); + cluster.setHeartbeatTimeout(2000); + + cluster.start(); + } + catch (Exception e) { + e.printStackTrace(); + fail("Could not start test minicluster: " + e.getMessage()); + } + } + + @AfterClass + public static void tearDownCluster() { + try { + cluster.stop(); + } + catch (Exception e) { + System.err.println("Error stopping cluster on shutdown"); + e.printStackTrace(); + fail("Cluster shutdown caused an exception: " + e.getMessage()); + } + } + + @Test + public void testFailedRunThenSuccessfulRun() { + + try { + List<Long> resultCollection = new ArrayList<Long>(); + + // attempt 1 + { + ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getJobManagerRpcPort()); + + env.setDegreeOfParallelism(4); + env.setNumberOfExecutionRetries(0); + + env.generateSequence(1, 10) + .rebalance() + .map(new FailingMapper1<Long>()) + .reduce(new ReduceFunction<Long>() { + @Override + public Long reduce(Long value1, Long value2) { + return value1 + value2; + } + }) + .output(new LocalCollectionOutputFormat<Long>(resultCollection)); + + try { + JobExecutionResult res = env.execute(); + String msg = res == null ? "null result" : "result in " + res.getNetRuntime(); + fail("The program should have failed, but returned " + msg); + } + catch (ProgramInvocationException e) { + // expected + } + } + + // attempt 2 + { + ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getJobManagerRpcPort()); + + env.setDegreeOfParallelism(4); + env.setNumberOfExecutionRetries(0); + + env.generateSequence(1, 10) + .rebalance() + .map(new FailingMapper1<Long>()) + .reduce(new ReduceFunction<Long>() { + @Override + public Long reduce(Long value1, Long value2) { + return value1 + value2; + } + }) + .output(new LocalCollectionOutputFormat<Long>(resultCollection)); + + try { + JobExecutionResult result = env.execute(); + assertTrue(result.getNetRuntime() >= 0); + assertNotNull(result.getAllAccumulatorResults()); + assertTrue(result.getAllAccumulatorResults().isEmpty()); + } + catch (JobExecutionException e) { + fail("The program should have succeeded on the second run"); + } + + long sum = 0; + for (long l : resultCollection) { + sum += l; + } + assertEquals(55, sum); + } + + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testRestart() { + try { + List<Long> resultCollection = new ArrayList<Long>(); + + ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getJobManagerRpcPort()); + + env.setDegreeOfParallelism(4); + env.setNumberOfExecutionRetries(1); + + env.generateSequence(1, 10) + .rebalance() + .map(new FailingMapper2<Long>()) + .reduce(new ReduceFunction<Long>() { + @Override + public Long reduce(Long value1, Long value2) { + return value1 + value2; + } + }) + .output(new LocalCollectionOutputFormat<Long>(resultCollection)); + + try { + JobExecutionResult result = env.execute(); + assertTrue(result.getNetRuntime() >= 0); + assertNotNull(result.getAllAccumulatorResults()); + assertTrue(result.getAllAccumulatorResults().isEmpty()); + } + catch (JobExecutionException e) { + fail("The program should have succeeded on the second run"); + } + + long sum = 0; + for (long l : resultCollection) { + sum += l; + } + assertEquals(55, sum); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testRestartMultipleTimes() { + try { + List<Long> resultCollection = new ArrayList<Long>(); + + ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getJobManagerRpcPort()); + + env.setDegreeOfParallelism(4); + env.setNumberOfExecutionRetries(3); + + env.generateSequence(1, 10) + .rebalance() + .map(new FailingMapper3<Long>()) + .reduce(new ReduceFunction<Long>() { + @Override + public Long reduce(Long value1, Long value2) { + return value1 + value2; + } + }) + .output(new LocalCollectionOutputFormat<Long>(resultCollection)); + + try { + JobExecutionResult result = env.execute(); + assertTrue(result.getNetRuntime() >= 0); + assertNotNull(result.getAllAccumulatorResults()); + assertTrue(result.getAllAccumulatorResults().isEmpty()); + } + catch (JobExecutionException e) { + fail("The program should have succeeded on the second run"); + } + + long sum = 0; + for (long l : resultCollection) { + sum += l; + } + assertEquals(55, sum); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // ------------------------------------------------------------------------------------ + + private static class FailingMapper1<T> extends RichMapFunction<T, T> { + + private static int failuresBeforeSuccess = 1; + + @Override + public T map(T value) throws Exception { + if (failuresBeforeSuccess > 0 && getRuntimeContext().getIndexOfThisSubtask() == 1) { + failuresBeforeSuccess--; + throw new Exception("Test Failure"); + } + + return value; + } + } + + private static class FailingMapper2<T> extends RichMapFunction<T, T> { + + private static int failuresBeforeSuccess = 1; + + @Override + public T map(T value) throws Exception { + if (failuresBeforeSuccess > 0 && getRuntimeContext().getIndexOfThisSubtask() == 1) { + failuresBeforeSuccess--; + throw new Exception("Test Failure"); + } + + return value; + } + } + + private static class FailingMapper3<T> extends RichMapFunction<T, T> { + + private static int failuresBeforeSuccess = 3; + + @Override + public T map(T value) throws Exception { + if (failuresBeforeSuccess > 0 && getRuntimeContext().getIndexOfThisSubtask() == 1) { + failuresBeforeSuccess--; + throw new Exception("Test Failure"); + } + + return value; + } + } +}
