[FLINK-1667] [runtime] Add test for recovery after TaskManager process failure


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/500ddff4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/500ddff4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/500ddff4

Branch: refs/heads/master
Commit: 500ddff4e3b5b47c7244411e14d76b65eb68563c
Parents: 9edc804
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Mar 9 18:20:59 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 9 19:31:05 2015 +0100

----------------------------------------------------------------------
 flink-tests/pom.xml                             |   8 +
 .../ProcessFailureBatchRecoveryITCase.java      | 460 +++++++++++++++++++
 2 files changed, 468 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/500ddff4/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 17c6b3e..95ca77c 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -125,6 +125,14 @@ under the License.
                </dependency>
 
                <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-runtime</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
                        <groupId>org.scalatest</groupId>
                        <artifactId>scalatest_2.10</artifactId>
                        <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/500ddff4/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
new file mode 100644
index 0000000..6866fbc
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import org.apache.commons.io.FileUtils;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
+
+import static org.junit.Assert.*;
+
+/**
+ * This test verifies the behavior of the recovery in the case when a 
TaskManager
+ * fails (process is killed) in the middle of a job execution.
+ *
+ * The test works with multiple task managers processes by spawning JVMs.
+ * Initially, it starts a JobManager in process and two TaskManagers JVMs with
+ * 2 task slots each.
+ * It submits a program with parallelism 4 and waits until all tasks are 
brought up.
+ * Coordination between the test and the tasks happens via checking for the
+ * existence of temporary files. It then starts another TaskManager, which is
+ * guaranteed to remain empty (all tasks are already deployed) and kills one of
+ * the original task managers. The recovery should restart the tasks on the 
new TaskManager.
+ */
+@SuppressWarnings("serial")
+public class ProcessFailureBatchRecoveryITCase {
+
+       private static final String READY_MARKER_FILE_PREFIX = "ready_";
+       private static final String PROCEED_MARKER_FILE = "proceed";
+
+       private static final int PARALLELISM = 4;
+
+       @Test
+       public void testTaskManagerProcessFailure() {
+
+               final StringWriter processOutput1 = new StringWriter();
+               final StringWriter processOutput2 = new StringWriter();
+               final StringWriter processOutput3 = new StringWriter();
+
+               ActorSystem jmActorSystem = null;
+               Process taskManagerProcess1 = null;
+               Process taskManagerProcess2 = null;
+               Process taskManagerProcess3 = null;
+
+               File coordinateTempDir = null;
+
+               try {
+                       // check that we run this test only if the java command
+                       // is available on this machine
+                       String javaCommand = getJavaCommandPath();
+                       if (javaCommand == null) {
+                               System.out.println("---- Skipping 
ProcessFailureBatchRecoveryITCase : Could not find java executable");
+                               return;
+                       }
+
+                       // create a logging file for the process
+                       File tempLogFile = 
File.createTempFile(getClass().getSimpleName() + "-", "-log4j.properties");
+                       tempLogFile.deleteOnExit();
+                       CommonTestUtils.printLog4jDebugConfig(tempLogFile);
+
+                       // coordination between the processes goes through a 
directory
+                       coordinateTempDir = createTempDirectory();
+
+                       // find a free port to start the JobManager
+                       final int jobManagerPort = NetUtils.getAvailablePort();
+
+                       // start a JobManager
+                       Tuple2<String, Object> localAddress = new 
Tuple2<String, Object>("localhost", jobManagerPort);
+
+                       Configuration jmConfig = new Configuration();
+                       
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms");
+                       
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
+                       
jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2);
+
+                       jmActorSystem = AkkaUtils.createActorSystem(jmConfig, 
new Some<Tuple2<String, Object>>(localAddress));
+                       ActorRef jmActor = 
JobManager.startJobManagerActors(jmConfig, jmActorSystem)._1();
+
+                       // the TaskManager java command
+                       String[] command = new String[] {
+                                       javaCommand,
+                                       "-Dlog.level=DEBUG",
+                                       "-Dlog4j.configuration=file:" + 
tempLogFile.getAbsolutePath(),
+                                       "-Xms80m", "-Xmx80m",
+                                       "-classpath", getCurrentClasspath(),
+                                       
TaskManagerProcessEntryPoint.class.getName(),
+                                       String.valueOf(jobManagerPort)
+                       };
+
+                       // start the first two TaskManager processes
+                       taskManagerProcess1 = new 
ProcessBuilder(command).start();
+                       new PipeForwarder(taskManagerProcess1.getErrorStream(), 
processOutput1);
+                       taskManagerProcess2 = new 
ProcessBuilder(command).start();
+                       new PipeForwarder(taskManagerProcess2.getErrorStream(), 
processOutput2);
+
+                       // we wait for the JobManager to have the two 
TaskManagers available
+                       // wait for at most 20 seconds
+                       waitUntilNumTaskManagersAreRegistered(jmActor, 2, 
20000);
+
+                       // the program will set a marker file in each of its 
parallel tasks once they are ready, so that
+                       // this coordinating code is aware of this.
+                       // the program will very slowly consume elements until 
the marker file (later created by the
+                       // test driver code) is present
+                       final File coordinateDirClosure = coordinateTempDir;
+                       final Throwable[] errorRef = new Throwable[1];
+
+                       ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort);
+                       env.setDegreeOfParallelism(PARALLELISM);
+                       env.setNumberOfExecutionRetries(1);
+
+                       final long NUM_ELEMENTS = 1000000L;
+                       final DataSet<Long> result = env.generateSequence(1, 
NUM_ELEMENTS)
+
+                               // make sure every mapper is involved (no one 
is skipped because of lazy split assignment)
+                               .rebalance()
+                               // the majority of the behavior is in the 
MapFunction
+                               .map(new RichMapFunction<Long, Long>() {
+
+                                       private final File proceedFile = new 
File(coordinateDirClosure, PROCEED_MARKER_FILE);
+
+                                       private boolean markerCreated = false;
+                                       private boolean checkForProceedFile = 
true;
+
+                                       @Override
+                                       public Long map(Long value) throws 
Exception {
+                                               if (!markerCreated) {
+                                                       int taskIndex = 
getRuntimeContext().getIndexOfThisSubtask();
+                                                       touchFile(new 
File(coordinateDirClosure, READY_MARKER_FILE_PREFIX + taskIndex));
+                                                       markerCreated = true;
+                                               }
+
+                                               // check if the proceed file 
exists
+                                               if (checkForProceedFile) {
+                                                       if 
(proceedFile.exists()) {
+                                                               
checkForProceedFile = false;
+                                                       }
+                                                       else {
+                                                               // otherwise 
wait so that we make slow progress
+                                                               
Thread.sleep(10);
+                                                       }
+                                               }
+                                               return value;
+                                       }
+                               })
+                               .reduce(new ReduceFunction<Long>() {
+                                       @Override
+                                       public Long reduce(Long value1, Long 
value2) {
+                                               return value1 + value2;
+                                       }
+                               });
+
+                       // we trigger a program now (in a separate thread)
+                       Thread programTrigger = new 
Thread("ProcessFailureBatchRecoveryITCase Program Trigger") {
+                               @Override
+                               public void run() {
+                                       try {
+                                               long sum = 
result.collect().get(0);
+                                               assertEquals(NUM_ELEMENTS * 
(NUM_ELEMENTS + 1L) / 2L, sum);
+                                       }
+                                       catch (Throwable t) {
+                                               t.printStackTrace();
+                                               errorRef[0] = t;
+                                       }
+                               }
+                       };
+                       programTrigger.start();
+
+                       // wait until all marker files are in place, indicating 
that all tasks have started
+                       // max 20 seconds
+                       waitForMarkerFiles(coordinateTempDir, PARALLELISM, 
20000);
+
+                       // start the third TaskManager
+                       taskManagerProcess3 = new 
ProcessBuilder(command).start();
+                       new PipeForwarder(taskManagerProcess3.getErrorStream(), 
processOutput3);
+
+                       // we wait for the third TaskManager to register (20 
seconds max)
+                       waitUntilNumTaskManagersAreRegistered(jmActor, 3, 
20000);
+
+                       // kill one of the previous TaskManagers, triggering a 
failure and recovery
+                       taskManagerProcess1.destroy();
+                       taskManagerProcess1 = null;
+
+                       // we create the marker file which signals the program 
functions tasks that they can complete
+                       touchFile(new File(coordinateTempDir, 
PROCEED_MARKER_FILE));
+
+                       // wait for at most 30 seconds for the program to 
complete
+                       programTrigger.join(30000);
+
+                       // check that the program really finished
+                       assertFalse("The program did not finish in time", 
programTrigger.isAlive());
+
+                       // check whether the program encountered an error
+                       if (errorRef[0] != null) {
+                               Throwable error = errorRef[0];
+                               error.printStackTrace();
+                               fail("The program encountered a " + 
error.getClass().getSimpleName() + " : " + error.getMessage());
+                       }
+
+                       // all seems well :-)
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       printProcessLog("TaskManager 1", 
processOutput1.toString());
+                       printProcessLog("TaskManager 2", 
processOutput2.toString());
+                       printProcessLog("TaskManager 3", 
processOutput3.toString());
+                       fail(e.getMessage());
+               }
+               catch (Error e) {
+                       e.printStackTrace();
+                       printProcessLog("TaskManager 1", 
processOutput1.toString());
+                       printProcessLog("TaskManager 2", 
processOutput2.toString());
+                       printProcessLog("TaskManager 3", 
processOutput3.toString());
+                       throw e;
+               }
+               finally {
+                       if (taskManagerProcess1 != null) {
+                               taskManagerProcess1.destroy();
+                       }
+                       if (taskManagerProcess2 != null) {
+                               taskManagerProcess2.destroy();
+                       }
+                       if (taskManagerProcess3 != null) {
+                               taskManagerProcess3.destroy();
+                       }
+                       if (jmActorSystem != null) {
+                               jmActorSystem.shutdown();
+                       }
+                       if (coordinateTempDir != null) {
+                               try {
+                                       
FileUtils.deleteDirectory(coordinateTempDir);
+                               }
+                               catch (Throwable t) {
+                                       // we can ignore this
+                               }
+                       }
+               }
+       }
+
+       private void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, 
int numExpected, long maxDelay)
+               throws Exception
+       {
+               final long deadline = System.currentTimeMillis() + maxDelay;
+               while (true) {
+                       long remaining = deadline - System.currentTimeMillis();
+                       if (remaining <= 0) {
+                               fail("The TaskManagers did not register within 
the expected time (" + maxDelay + "msecs)");
+                       }
+
+                       FiniteDuration timeout = new FiniteDuration(remaining, 
TimeUnit.MILLISECONDS);
+
+                       try {
+                               Future<?> result = Patterns.ask(jobManager,
+                                                                               
                JobManagerMessages.getRequestNumberRegisteredTaskManager(),
+                                                                               
                new Timeout(timeout));
+                               Integer numTMs = (Integer) Await.result(result, 
timeout);
+                               if (numTMs == numExpected) {
+                                       break;
+                               }
+                       }
+                       catch (TimeoutException e) {
+                               // ignore and retry
+                       }
+                       catch (ClassCastException e) {
+                               fail("Wrong response: " + e.getMessage());
+                       }
+               }
+       }
+
+       private static void printProcessLog(String processName, String log) {
+               if (log == null || log.length() == 0) {
+                       return;
+               }
+
+               System.out.println("-----------------------------------------");
+               System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + 
processName);
+               System.out.println("-----------------------------------------");
+               System.out.println(log);
+               System.out.println("-----------------------------------------");
+               System.out.println("        END SPAWNED PROCESS LOG");
+               System.out.println("-----------------------------------------");
+       }
+
+       private static File createTempDirectory() throws IOException {
+               File tempDir = new File(System.getProperty("java.io.tmpdir"));
+
+               for (int i = 0; i < 10; i++) {
+                       File dir = new File(tempDir, 
UUID.randomUUID().toString());
+                       if (!dir.exists() && dir.mkdirs()) {
+                               return dir;
+                       }
+                       System.err.println("Could not use temporary directory " 
+ dir.getAbsolutePath());
+               }
+
+               throw new IOException("Could not create temporary file 
directory");
+       }
+
+       private static void touchFile(File file) throws IOException {
+               if (!file.exists()) {
+                       new FileOutputStream(file).close();
+               }
+               if (!file.setLastModified(System.currentTimeMillis())) {
+                       throw new IOException("Could not touch the file.");
+               }
+       }
+
+       private static void waitForMarkerFiles(File basedir, int num, long 
timeout) {
+               long now = System.currentTimeMillis();
+               final long deadline = now + timeout;
+
+
+               while (now < deadline) {
+                       boolean allFound = true;
+
+                       for (int i = 0; i < num; i++) {
+                               File nextToCheck = new File(basedir, 
READY_MARKER_FILE_PREFIX + i);
+                               if (!nextToCheck.exists()) {
+                                       allFound = false;
+                                       break;
+                               }
+                       }
+
+                       if (allFound) {
+                               return;
+                       }
+                       else {
+                               // not all found, wait for a bit
+                               try {
+                                       Thread.sleep(10);
+                               }
+                               catch (InterruptedException e) {
+                                       throw new RuntimeException(e);
+                               }
+
+                               now = System.currentTimeMillis();
+                       }
+               }
+
+               fail("The tasks were not started within time (" + timeout + 
"msecs)");
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * The entry point for the TaskManager JVM. Simply configures and runs 
a TaskManager.
+        */
+       public static class TaskManagerProcessEntryPoint {
+
+               private static final Logger LOG = 
LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class);
+
+               public static void main(String[] args) {
+                       try {
+                               int jobManagerPort = Integer.parseInt(args[0]);
+
+                               Configuration cfg = new Configuration();
+                               
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+                               
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
+                               
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+                               
cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
+                               
cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+
+                               TaskManager.runTaskManager(cfg, 
TaskManager.class);
+
+                               // wait forever
+                               Object lock = new Object();
+                               synchronized (lock) {
+                                       lock.wait();
+                               }
+                       }
+                       catch (Throwable t) {
+                               LOG.error("Failed to start TaskManager 
process", t);
+                               System.exit(1);
+                       }
+               }
+       }
+
+       /**
+        * Utility class to read the output of a process stream and forward it 
into a StringWriter.
+        */
+       private static class PipeForwarder extends Thread {
+
+               private final StringWriter target;
+               private final InputStream source;
+
+               public PipeForwarder(InputStream source, StringWriter target) {
+                       super("Pipe Forwarder");
+                       setDaemon(true);
+
+                       this.source = source;
+                       this.target = target;
+
+                       start();
+               }
+
+               @Override
+               public void run() {
+                       try {
+                               int next;
+                               while ((next = source.read()) != -1) {
+                                       target.write(next);
+                               }
+                       }
+                       catch (IOException e) {
+                               // terminate
+                       }
+               }
+       }
+}

Reply via email to