[tests] Add test for restart recovery

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/785f2041
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/785f2041
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/785f2041

Branch: refs/heads/release-0.8
Commit: 785f2041fdf8f8409e7cfcc66451b79d7bf57673
Parents: 4ff3f4c
Author: Stephan Ewen <[email protected]>
Authored: Mon Feb 16 21:40:06 2015 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Wed Mar 4 17:49:59 2015 +0100

----------------------------------------------------------------------
 .../client/minicluster/NepheleMiniCluster.java  |  40 ++-
 .../client/program/AutoParallelismITCase.java   | 118 --------
 .../flink/api/java/ExecutionEnvironment.java    |   3 +
 .../runtime/executiongraph/ExecutionGraph.java  |   2 +-
 .../test/recovery/AutoParallelismITCase.java    | 122 ++++++++
 .../test/recovery/SimpleRecoveryITCase.java     | 287 +++++++++++++++++++
 6 files changed, 448 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/785f2041/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
 
b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
index a40b733..79099c7 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
@@ -82,6 +82,9 @@ public class NepheleMiniCluster {
        
        private boolean defaultAlwaysCreateDirectory = false;
 
+       private long heartbeatInterval = 
ConfigConstants.DEFAULT_TASK_MANAGER_HEARTBEAT_INTERVAL;
+
+       private long heartbeatTimeout = 
ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT;
        
        private JobManager jobManager;
 
@@ -163,11 +166,33 @@ public class NepheleMiniCluster {
 
        public void setNumTaskManager(int numTaskManager) { this.numTaskManager 
= numTaskManager; }
 
-       public int getNumTaskManager() { return numTaskManager; }
+       public int getNumTaskManager() {
+               return numTaskManager;
+       }
 
-       public void setTaskManagerNumSlots(int taskManagerNumSlots) { 
this.taskManagerNumSlots = taskManagerNumSlots; }
+       public void setTaskManagerNumSlots(int taskManagerNumSlots) {
+               this.taskManagerNumSlots = taskManagerNumSlots;
+       }
 
-       public int getTaskManagerNumSlots() { return taskManagerNumSlots; }
+       public int getTaskManagerNumSlots() {
+               return taskManagerNumSlots;
+       }
+
+       public void setHeartbeatInterval(long heartbeatInterval) {
+               this.heartbeatInterval = heartbeatInterval;
+       }
+
+       public long getHeartbeatInterval() {
+               return heartbeatInterval;
+       }
+
+       public void setHeartbeatTimeout(long heartbeatTimeout) {
+               this.heartbeatTimeout = heartbeatTimeout;
+       }
+
+       public long getHeartbeatTimeout() {
+               return heartbeatTimeout;
+       }
 
        // 
------------------------------------------------------------------------
        // Life cycle and Job Submission
@@ -206,7 +231,8 @@ public class NepheleMiniCluster {
                        } else {
                                Configuration conf = 
getMiniclusterDefaultConfig(jobManagerRpcPort, taskManagerRpcPort,
                                        taskManagerDataPort, memorySize, 
hdfsConfigFile, lazyMemoryAllocation, defaultOverwriteFiles,
-                                               defaultAlwaysCreateDirectory, 
taskManagerNumSlots, numTaskManager);
+                                               defaultAlwaysCreateDirectory, 
taskManagerNumSlots, numTaskManager,
+                                       heartbeatInterval, heartbeatTimeout);
                                GlobalConfiguration.includeConfiguration(conf);
                        }
 
@@ -297,7 +323,8 @@ public class NepheleMiniCluster {
        public static Configuration getMiniclusterDefaultConfig(int 
jobManagerRpcPort, int taskManagerRpcPort,
                        int taskManagerDataPort, long memorySize, String 
hdfsConfigFile, boolean lazyMemory,
                        boolean defaultOverwriteFiles, boolean 
defaultAlwaysCreateDirectory,
-                       int taskManagerNumSlots, int numTaskManager)
+                       int taskManagerNumSlots, int numTaskManager,
+                       long heartbeatInterval, long heartbeatTimeout)
        {
                final Configuration config = new Configuration();
                
@@ -350,6 +377,9 @@ public class NepheleMiniCluster {
                
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
numTaskManager);
 
                config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
taskManagerNumSlots);
+
+               
config.setLong(ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY, 
heartbeatInterval);
+               
config.setLong(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 
heartbeatTimeout);
                
                return config;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/785f2041/flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java
deleted file mode 100644
index c1fa888..0000000
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.client.program;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RichMapPartitionFunction;
-import org.apache.flink.api.common.io.GenericInputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.client.minicluster.NepheleMiniCluster;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * This test verifies that the auto parallelism is properly forwarded to the 
runtime.
- */
-public class AutoParallelismITCase {
-
-       private static final int NUM_TM = 2;
-       private static final int SLOTS_PER_TM = 7;
-       private static final int PARALLELISM = NUM_TM * SLOTS_PER_TM;
-
-       @Test
-       public void testProgramWithAutoParallelism() {
-
-               NepheleMiniCluster cluster = new NepheleMiniCluster();
-               cluster.setNumTaskManager(NUM_TM);
-               cluster.setTaskManagerNumSlots(SLOTS_PER_TM);
-
-               try {
-                       cluster.start();
-
-                       ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment("localhost", 
cluster.getJobManagerRpcPort());
-                       
env.setDegreeOfParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
-
-                       DataSet<Integer> result = env
-                                       .createInput(new 
ParallelismDependentInputFormat())
-                                       .mapPartition(new 
ParallelismDependentMapPartition());
-
-                       List<Integer> resultCollection = new 
ArrayList<Integer>();
-                       result.output(new 
LocalCollectionOutputFormat<Integer>(resultCollection));
-
-                       env.execute();
-
-                       assertEquals(PARALLELISM, resultCollection.size());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               finally {
-                       try {
-                               cluster.stop();
-                       }
-                       catch (Throwable t) {
-                               // ignore exceptions on shutdown
-                       }
-               }
-       }
-
-       private static class ParallelismDependentInputFormat extends 
GenericInputFormat<Integer> {
-
-               private transient boolean emitted;
-
-               @Override
-               public GenericInputSplit[] createInputSplits(int numSplits) 
throws IOException {
-                       assertEquals(PARALLELISM, numSplits);
-                       return super.createInputSplits(numSplits);
-               }
-
-               @Override
-               public boolean reachedEnd() {
-                       return emitted;
-               }
-
-               @Override
-               public Integer nextRecord(Integer reuse) {
-                       if (emitted) {
-                               return null;
-                       }
-                       emitted = true;
-                       return 1;
-               }
-       }
-
-       private static class ParallelismDependentMapPartition extends 
RichMapPartitionFunction<Integer, Integer> {
-
-               @Override
-               public void mapPartition(Iterable<Integer> values, 
Collector<Integer> out) {
-                       
out.collect(getRuntimeContext().getIndexOfThisSubtask());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/785f2041/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 61a74b9..2026ace 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -813,6 +813,9 @@ public abstract class ExecutionEnvironment {
                if (getDegreeOfParallelism() > 0) {
                        plan.setDefaultParallelism(getDegreeOfParallelism());
                }
+               if (getNumberOfExecutionRetries() >= 0) {
+                       
plan.setNumberOfExecutionRetries(getNumberOfExecutionRetries());
+               }
 
                try {
                        registerCachedFilesWithPlan(plan);

http://git-wip-us.apache.org/repos/asf/flink/blob/785f2041/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 1f9cf26..dfd2e39 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -735,7 +735,7 @@ public class ExecutionGraph {
                                        throw new IllegalStateException("Can 
only restart job from state restarting.");
                                }
                                if (scheduler == null) {
-                                       throw new IllegalStateException("The 
execution graph has not been schedudled before - scheduler is null.");
+                                       throw new IllegalStateException("The 
execution graph has not been scheduled before - scheduler is null.");
                                }
                                
                                this.currentExecutions.clear();

http://git-wip-us.apache.org/repos/asf/flink/blob/785f2041/flink-tests/src/test/java/org/apache/flink/test/recovery/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AutoParallelismITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AutoParallelismITCase.java
new file mode 100644
index 0000000..bbfe0a9
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AutoParallelismITCase.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.client.minicluster.NepheleMiniCluster;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This test verifies that the auto parallelism is properly forwarded to the 
runtime.
+ */
+public class AutoParallelismITCase {
+
+       private static final int NUM_TM = 2;
+       private static final int SLOTS_PER_TM = 7;
+       private static final int PARALLELISM = NUM_TM * SLOTS_PER_TM;
+
+       @Test
+       public void testProgramWithAutoParallelism() {
+
+               NepheleMiniCluster cluster = new NepheleMiniCluster();
+               cluster.setNumTaskManager(NUM_TM);
+               cluster.setTaskManagerNumSlots(SLOTS_PER_TM);
+
+               try {
+                       cluster.start();
+
+                       ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment("localhost", 
cluster.getJobManagerRpcPort());
+                       
env.setDegreeOfParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+
+                       DataSet<Integer> result = env
+                                       .createInput(new 
ParallelismDependentInputFormat())
+                                       .rebalance()
+                                       .mapPartition(new 
ParallelismDependentMapPartition());
+
+                       List<Integer> resultCollection = new 
ArrayList<Integer>();
+                       result.output(new 
LocalCollectionOutputFormat<Integer>(resultCollection));
+
+                       env.execute();
+
+                       assertEquals(PARALLELISM, resultCollection.size());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       try {
+                               cluster.stop();
+                       }
+                       catch (Throwable t) {
+                               // ignore exceptions on shutdown
+                       }
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Utility classes
+       // 
--------------------------------------------------------------------------------------------
+
+       private static class ParallelismDependentInputFormat extends 
GenericInputFormat<Integer> {
+
+               private transient boolean emitted;
+
+               @Override
+               public GenericInputSplit[] createInputSplits(int numSplits) 
throws IOException {
+                       assertEquals(PARALLELISM, numSplits);
+                       return super.createInputSplits(numSplits);
+               }
+
+               @Override
+               public boolean reachedEnd() {
+                       return emitted;
+               }
+
+               @Override
+               public Integer nextRecord(Integer reuse) {
+                       if (emitted) {
+                               return null;
+                       }
+                       emitted = true;
+                       return 1;
+               }
+       }
+
+       private static class ParallelismDependentMapPartition extends 
RichMapPartitionFunction<Integer, Integer> {
+
+               @Override
+               public void mapPartition(Iterable<Integer> values, 
Collector<Integer> out) {
+                       
out.collect(getRuntimeContext().getIndexOfThisSubtask());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/785f2041/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
new file mode 100644
index 0000000..bec9c3f
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.client.minicluster.NepheleMiniCluster;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.runtime.client.JobExecutionException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class SimpleRecoveryITCase {
+
+       private static NepheleMiniCluster cluster;
+
+       @BeforeClass
+       public static void setupCluster() {
+               try {
+                       cluster = new NepheleMiniCluster();
+                       cluster.setNumTaskManager(2);
+                       cluster.setTaskManagerNumSlots(2);
+
+                       // these two parameters determine how fast the restart 
happens
+                       cluster.setHeartbeatInterval(500);
+                       cluster.setHeartbeatTimeout(2000);
+
+                       cluster.start();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail("Could not start test minicluster: " + 
e.getMessage());
+               }
+       }
+
+       @AfterClass
+       public static void tearDownCluster() {
+               try {
+                       cluster.stop();
+               }
+               catch (Exception e) {
+                       System.err.println("Error stopping cluster on 
shutdown");
+                       e.printStackTrace();
+                       fail("Cluster shutdown caused an exception: " + 
e.getMessage());
+               }
+       }
+
+       @Test
+       public void testFailedRunThenSuccessfulRun() {
+
+               try {
+                       List<Long> resultCollection = new ArrayList<Long>();
+
+                       // attempt 1
+                       {
+                               ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
+                                               "localhost", 
cluster.getJobManagerRpcPort());
+
+                               env.setDegreeOfParallelism(4);
+                               env.setNumberOfExecutionRetries(0);
+
+                               env.generateSequence(1, 10)
+                                               .rebalance()
+                                               .map(new FailingMapper1<Long>())
+                                               .reduce(new 
ReduceFunction<Long>() {
+                                                       @Override
+                                                       public Long reduce(Long 
value1, Long value2) {
+                                                               return value1 + 
value2;
+                                                       }
+                                               })
+                                               .output(new 
LocalCollectionOutputFormat<Long>(resultCollection));
+
+                               try {
+                                       JobExecutionResult res = env.execute();
+                                       String msg = res == null ? "null 
result" : "result in " + res.getNetRuntime();
+                                       fail("The program should have failed, 
but returned " + msg);
+                               }
+                               catch (ProgramInvocationException e) {
+                                       // expected
+                               }
+                       }
+
+                       // attempt 2
+                       {
+                               ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
+                                               "localhost", 
cluster.getJobManagerRpcPort());
+
+                               env.setDegreeOfParallelism(4);
+                               env.setNumberOfExecutionRetries(0);
+
+                               env.generateSequence(1, 10)
+                                               .rebalance()
+                                               .map(new FailingMapper1<Long>())
+                                               .reduce(new 
ReduceFunction<Long>() {
+                                                       @Override
+                                                       public Long reduce(Long 
value1, Long value2) {
+                                                               return value1 + 
value2;
+                                                       }
+                                               })
+                                               .output(new 
LocalCollectionOutputFormat<Long>(resultCollection));
+
+                               try {
+                                       JobExecutionResult result = 
env.execute();
+                                       assertTrue(result.getNetRuntime() >= 0);
+                                       
assertNotNull(result.getAllAccumulatorResults());
+                                       
assertTrue(result.getAllAccumulatorResults().isEmpty());
+                               }
+                               catch (JobExecutionException e) {
+                                       fail("The program should have succeeded 
on the second run");
+                               }
+
+                               long sum = 0;
+                               for (long l : resultCollection) {
+                                       sum += l;
+                               }
+                               assertEquals(55, sum);
+                       }
+
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testRestart() {
+               try {
+                       List<Long> resultCollection = new ArrayList<Long>();
+
+                       ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
+                                       "localhost", 
cluster.getJobManagerRpcPort());
+
+                       env.setDegreeOfParallelism(4);
+                       env.setNumberOfExecutionRetries(1);
+
+                       env.generateSequence(1, 10)
+                                       .rebalance()
+                                       .map(new FailingMapper2<Long>())
+                                       .reduce(new ReduceFunction<Long>() {
+                                               @Override
+                                               public Long reduce(Long value1, 
Long value2) {
+                                                       return value1 + value2;
+                                               }
+                                       })
+                                       .output(new 
LocalCollectionOutputFormat<Long>(resultCollection));
+
+                       try {
+                               JobExecutionResult result = env.execute();
+                               assertTrue(result.getNetRuntime() >= 0);
+                               
assertNotNull(result.getAllAccumulatorResults());
+                               
assertTrue(result.getAllAccumulatorResults().isEmpty());
+                       }
+                       catch (JobExecutionException e) {
+                               fail("The program should have succeeded on the 
second run");
+                       }
+
+                       long sum = 0;
+                       for (long l : resultCollection) {
+                               sum += l;
+                       }
+                       assertEquals(55, sum);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testRestartMultipleTimes() {
+               try {
+                       List<Long> resultCollection = new ArrayList<Long>();
+
+                       ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
+                                       "localhost", 
cluster.getJobManagerRpcPort());
+
+                       env.setDegreeOfParallelism(4);
+                       env.setNumberOfExecutionRetries(3);
+
+                       env.generateSequence(1, 10)
+                                       .rebalance()
+                                       .map(new FailingMapper3<Long>())
+                                       .reduce(new ReduceFunction<Long>() {
+                                               @Override
+                                               public Long reduce(Long value1, 
Long value2) {
+                                                       return value1 + value2;
+                                               }
+                                       })
+                                       .output(new 
LocalCollectionOutputFormat<Long>(resultCollection));
+
+                       try {
+                               JobExecutionResult result = env.execute();
+                               assertTrue(result.getNetRuntime() >= 0);
+                               
assertNotNull(result.getAllAccumulatorResults());
+                               
assertTrue(result.getAllAccumulatorResults().isEmpty());
+                       }
+                       catch (JobExecutionException e) {
+                               fail("The program should have succeeded on the 
second run");
+                       }
+
+                       long sum = 0;
+                       for (long l : resultCollection) {
+                               sum += l;
+                       }
+                       assertEquals(55, sum);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       // 
------------------------------------------------------------------------------------
+
+       private static class FailingMapper1<T> extends RichMapFunction<T, T> {
+
+               private static int failuresBeforeSuccess = 1;
+
+               @Override
+               public T map(T value) throws Exception {
+                       if (failuresBeforeSuccess > 0 && 
getRuntimeContext().getIndexOfThisSubtask() == 1) {
+                               failuresBeforeSuccess--;
+                               throw new Exception("Test Failure");
+                       }
+
+                       return value;
+               }
+       }
+
+       private static class FailingMapper2<T> extends RichMapFunction<T, T> {
+
+               private static int failuresBeforeSuccess = 1;
+
+               @Override
+               public T map(T value) throws Exception {
+                       if (failuresBeforeSuccess > 0 && 
getRuntimeContext().getIndexOfThisSubtask() == 1) {
+                               failuresBeforeSuccess--;
+                               throw new Exception("Test Failure");
+                       }
+
+                       return value;
+               }
+       }
+
+       private static class FailingMapper3<T> extends RichMapFunction<T, T> {
+
+               private static int failuresBeforeSuccess = 3;
+
+               @Override
+               public T map(T value) throws Exception {
+                       if (failuresBeforeSuccess > 0 && 
getRuntimeContext().getIndexOfThisSubtask() == 1) {
+                               failuresBeforeSuccess--;
+                               throw new Exception("Test Failure");
+                       }
+
+                       return value;
+               }
+       }
+}

Reply via email to