Repository: flink Updated Branches: refs/heads/master d03dd63b7 -> c284745ee
[FLINK-1669] [streaming] Test for streaming recovery from process failure This closes #496 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/56afefc1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/56afefc1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/56afefc1 Branch: refs/heads/master Commit: 56afefc137b50d612e933fd29fcc5e938384631f Parents: d03dd63 Author: mbalassi <mbala...@apache.org> Authored: Wed Mar 18 10:36:22 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Sun Mar 29 18:37:41 2015 +0200 ---------------------------------------------------------------------- .../environment/StreamExecutionEnvironment.java | 11 + .../util/ProcessFailureRecoveryTestBase.java | 456 +++++++++++++++++++ .../ProcessFailureBatchRecoveryITCase.java | 411 ++--------------- .../ProcessFailureStreamingRecoveryITCase.java | 208 +++++++++ 4 files changed, 701 insertions(+), 385 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/56afefc1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 6dd7947..1307b7a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -159,6 +159,17 @@ public abstract class StreamExecutionEnvironment { } /** + * Sets the number of times that failed tasks are re-executed. A value of zero + * effectively disables fault tolerance. A value of {@code -1} indicates that the system + * default value (as defined in the configuration) should be used. + * + * @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks. + */ + public void setNumberOfExecutionRetries(int numberOfExecutionRetries) { + config.setNumberOfExecutionRetries(numberOfExecutionRetries); + } + + /** * Sets the maximum time frequency (milliseconds) for the flushing of the * output buffers. By default the output buffers flush frequently to provide * low latency and to aid smooth developer experience. Setting the parameter http://git-wip-us.apache.org/repos/asf/flink/blob/56afefc1/flink-test-utils/src/main/java/org/apache/flink/test/util/ProcessFailureRecoveryTestBase.java ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/ProcessFailureRecoveryTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/ProcessFailureRecoveryTestBase.java new file mode 100644 index 0000000..3aef925 --- /dev/null +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/ProcessFailureRecoveryTestBase.java @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.pattern.Patterns; +import akka.util.Timeout; +import org.apache.commons.io.FileUtils; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.net.NetUtils; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Some; +import scala.Tuple2; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; +import java.util.HashSet; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath; +import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +/** + * This is a testbase for tests verifying the behavior of the recovery in the + * case when a TaskManager fails (process is killed) in the middle of a job execution. + * + * The test works with multiple task managers processes by spawning JVMs. + * Initially, it starts a JobManager in process and two TaskManagers JVMs with + * 2 task slots each. + * It submits a program with parallelism 4 and waits until all tasks are brought up. + * Coordination between the test and the tasks happens via checking for the + * existence of temporary files. It then starts another TaskManager, which is + * guaranteed to remain empty (all tasks are already deployed) and kills one of + * the original task managers. The recovery should restart the tasks on the new TaskManager. + */ +public abstract class ProcessFailureRecoveryTestBase { + + protected static final String READY_MARKER_FILE_PREFIX = "ready_"; + protected static final String PROCEED_MARKER_FILE = "proceed"; + + protected static final int PARALLELISM = 4; + + @Test + public void testTaskManagerProcessFailure() { + + final StringWriter processOutput1 = new StringWriter(); + final StringWriter processOutput2 = new StringWriter(); + final StringWriter processOutput3 = new StringWriter(); + + ActorSystem jmActorSystem = null; + Process taskManagerProcess1 = null; + Process taskManagerProcess2 = null; + Process taskManagerProcess3 = null; + + File coordinateTempDir = null; + + try { + // check that we run this test only if the java command + // is available on this machine + String javaCommand = getJavaCommandPath(); + if (javaCommand == null) { + System.out.println("---- Skipping ProcessFailureBatchRecoveryITCase : Could not find java executable"); + return; + } + + // create a logging file for the process + File tempLogFile = File.createTempFile(getClass().getSimpleName() + "-", "-log4j.properties"); + tempLogFile.deleteOnExit(); + CommonTestUtils.printLog4jDebugConfig(tempLogFile); + + // coordination between the processes goes through a directory + coordinateTempDir = createTempDirectory(); + + // find a free port to start the JobManager + final int jobManagerPort = NetUtils.getAvailablePort(); + + // start a JobManager + Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort); + + Configuration jmConfig = new Configuration(); + jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms"); + jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s"); + jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2); + + jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress)); + ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem)._1(); + + // the TaskManager java command + String[] command = new String[]{ + javaCommand, + "-Dlog.level=DEBUG", + "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(), + "-Xms80m", "-Xmx80m", + "-classpath", getCurrentClasspath(), + TaskManagerProcessEntryPoint.class.getName(), + String.valueOf(jobManagerPort) + }; + + // start the first two TaskManager processes + taskManagerProcess1 = new ProcessBuilder(command).start(); + new PipeForwarder(taskManagerProcess1.getErrorStream(), processOutput1); + taskManagerProcess2 = new ProcessBuilder(command).start(); + new PipeForwarder(taskManagerProcess2.getErrorStream(), processOutput2); + + // we wait for the JobManager to have the two TaskManagers available + // wait for at most 20 seconds + waitUntilNumTaskManagersAreRegistered(jmActor, 2, 20000); + + // the program will set a marker file in each of its parallel tasks once they are ready, so that + // this coordinating code is aware of this. + // the program will very slowly consume elements until the marker file (later created by the + // test driver code) is present + final File coordinateDirClosure = coordinateTempDir; + final Throwable[] errorRef = new Throwable[1]; + + // get a trigger for the test program implemented by a subclass + Thread programTrigger = testProgram(jobManagerPort, coordinateDirClosure, errorRef); + + //start the test program + programTrigger.start(); + + // wait until all marker files are in place, indicating that all tasks have started + // max 20 seconds + waitForMarkerFiles(coordinateTempDir, PARALLELISM, 20000); + + // start the third TaskManager + taskManagerProcess3 = new ProcessBuilder(command).start(); + new PipeForwarder(taskManagerProcess3.getErrorStream(), processOutput3); + + // we wait for the third TaskManager to register (20 seconds max) + waitUntilNumTaskManagersAreRegistered(jmActor, 3, 20000); + + // kill one of the previous TaskManagers, triggering a failure and recovery + taskManagerProcess1.destroy(); + taskManagerProcess1 = null; + + // we create the marker file which signals the program functions tasks that they can complete + touchFile(new File(coordinateTempDir, PROCEED_MARKER_FILE)); + + // wait for at most 2 minutes for the program to complete + programTrigger.join(120000); + + // check that the program really finished + assertFalse("The program did not finish in time", programTrigger.isAlive()); + + // apply post submission checks specified by the subclass + postSubmit(); + + // check whether the program encountered an error + if (errorRef[0] != null) { + Throwable error = errorRef[0]; + error.printStackTrace(); + fail("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage()); + } + + // all seems well :-) + + } catch (Exception e) { + e.printStackTrace(); + printProcessLog("TaskManager 1", processOutput1.toString()); + printProcessLog("TaskManager 2", processOutput2.toString()); + printProcessLog("TaskManager 3", processOutput3.toString()); + fail(e.getMessage()); + } catch (Error e) { + e.printStackTrace(); + printProcessLog("TaskManager 1", processOutput1.toString()); + printProcessLog("TaskManager 2", processOutput2.toString()); + printProcessLog("TaskManager 3", processOutput3.toString()); + throw e; + } finally { + if (taskManagerProcess1 != null) { + taskManagerProcess1.destroy(); + } + if (taskManagerProcess2 != null) { + taskManagerProcess2.destroy(); + } + if (taskManagerProcess3 != null) { + taskManagerProcess3.destroy(); + } + if (jmActorSystem != null) { + jmActorSystem.shutdown(); + } + if (coordinateTempDir != null) { + try { + FileUtils.deleteDirectory(coordinateTempDir); + } + catch (Throwable t) { + // we can ignore this + } + } + } + } + + /** + * The test program should be implemented here in a form of a separate thread. + * This provides a solution for checking that it has been terminated. + * + * @param jobManagerPort The port for submitting the topology to the local cluster + * @param coordinateDirClosure taskmanager failure will be triggered only after proccesses + * have successfully created file under this directory + * @param errorRef Errors passed back to the superclass + * @return thread containing the test program + */ + abstract public Thread testProgram(int jobManagerPort, final File coordinateDirClosure, final Throwable[] errorRef); + + /** + * Check to be carried out after the completion of the test program thread. + * In case of failed checks {@link java.lang.AssertionError} should be thrown. + * + * @throws Error + * @throws Exception + */ + abstract public void postSubmit() throws Error, Exception; + + + protected void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay) + throws Exception + { + final long deadline = System.currentTimeMillis() + maxDelay; + while (true) { + long remaining = deadline - System.currentTimeMillis(); + if (remaining <= 0) { + fail("The TaskManagers did not register within the expected time (" + maxDelay + "msecs)"); + } + + FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.MILLISECONDS); + + try { + Future<?> result = Patterns.ask(jobManager, + JobManagerMessages.getRequestNumberRegisteredTaskManager(), + new Timeout(timeout)); + Integer numTMs = (Integer) Await.result(result, timeout); + if (numTMs == numExpected) { + break; + } + } + catch (TimeoutException e) { + // ignore and retry + } + catch (ClassCastException e) { + fail("Wrong response: " + e.getMessage()); + } + } + } + + public static void fileBatchHasEveryNumberLower(int n, String path) throws IOException, AssertionError { + + HashSet<Integer> set = new HashSet<Integer>(n); + + int counter = 0; + File file = new File(path + "-" + counter); + + while (file.exists()) { + + BufferedReader bufferedReader = new BufferedReader(new FileReader(file)); + + String line = bufferedReader.readLine(); + + while (line != null) { + int num = Integer.parseInt(line); + + set.add(num); + + line = bufferedReader.readLine(); + } + + bufferedReader.close(); + file.delete(); + counter++; + file = new File(path + "-" + counter); + } + + for (int i = 0; i < n; i++) { + if (!set.contains(i)) { + throw new AssertionError("Missing number: " + i); + } + } + } + + protected static void printProcessLog(String processName, String log) { + if (log == null || log.length() == 0) { + return; + } + + System.out.println("-----------------------------------------"); + System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + processName); + System.out.println("-----------------------------------------"); + System.out.println(log); + System.out.println("-----------------------------------------"); + System.out.println(" END SPAWNED PROCESS LOG"); + System.out.println("-----------------------------------------"); + } + + protected static File createTempDirectory() throws IOException { + File tempDir = new File(System.getProperty("java.io.tmpdir")); + + for (int i = 0; i < 10; i++) { + File dir = new File(tempDir, UUID.randomUUID().toString()); + if (!dir.exists() && dir.mkdirs()) { + return dir; + } + System.err.println("Could not use temporary directory " + dir.getAbsolutePath()); + } + + throw new IOException("Could not create temporary file directory"); + } + + protected static void touchFile(File file) throws IOException { + if (!file.exists()) { + new FileOutputStream(file).close(); + } + if (!file.setLastModified(System.currentTimeMillis())) { + throw new IOException("Could not touch the file."); + } + } + + protected static void waitForMarkerFiles(File basedir, int num, long timeout) { + long now = System.currentTimeMillis(); + final long deadline = now + timeout; + + + while (now < deadline) { + boolean allFound = true; + + for (int i = 0; i < num; i++) { + File nextToCheck = new File(basedir, READY_MARKER_FILE_PREFIX + i); + if (!nextToCheck.exists()) { + allFound = false; + break; + } + } + + if (allFound) { + return; + } + else { + // not all found, wait for a bit + try { + Thread.sleep(10); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + + now = System.currentTimeMillis(); + } + } + + fail("The tasks were not started within time (" + timeout + "msecs)"); + } + + // -------------------------------------------------------------------------------------------- + + /** + * The entry point for the TaskManager JVM. Simply configures and runs a TaskManager. + */ + public static class TaskManagerProcessEntryPoint { + + private static final Logger LOG = LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class); + + public static void main(String[] args) { + try { + int jobManagerPort = Integer.parseInt(args[0]); + + Configuration cfg = new Configuration(); + cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); + cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort); + cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4); + cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100); + cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); + + TaskManager.runTaskManager(cfg, TaskManager.class); + + // wait forever + Object lock = new Object(); + synchronized (lock) { + lock.wait(); + } + } + catch (Throwable t) { + LOG.error("Failed to start TaskManager process", t); + System.exit(1); + } + } + } + + /** + * Utility class to read the output of a process stream and forward it into a StringWriter. + */ + protected static class PipeForwarder extends Thread { + + private final StringWriter target; + private final InputStream source; + + public PipeForwarder(InputStream source, StringWriter target) { + super("Pipe Forwarder"); + setDaemon(true); + + this.source = source; + this.target = target; + + start(); + } + + @Override + public void run() { + try { + int next; + while ((next = source.read()) != -1) { + target.write(next); + } + } + catch (IOException e) { + // terminate + } + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/56afefc1/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java index ec498ea..c013fa8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java @@ -18,73 +18,22 @@ package org.apache.flink.test.recovery; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.pattern.Patterns; -import akka.util.Timeout; -import org.apache.commons.io.FileUtils; -import org.apache.flink.api.common.ExecutionMode; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.jobmanager.JobManager; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.net.NetUtils; -import org.apache.flink.runtime.taskmanager.TaskManager; -import org.apache.flink.runtime.testutils.CommonTestUtils; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Some; -import scala.Tuple2; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; +import org.apache.flink.test.util.ProcessFailureRecoveryTestBase; import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.StringWriter; -import java.util.Arrays; -import java.util.Collection; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath; -import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.fail; /** - * This test verifies the behavior of the recovery in the case when a TaskManager - * fails (process is killed) in the middle of a job execution. - * - * The test works with multiple task managers processes by spawning JVMs. - * Initially, it starts a JobManager in process and two TaskManagers JVMs with - * 2 task slots each. - * It submits a program with parallelism 4 and waits until all tasks are brought up. - * Coordination between the test and the tasks happens via checking for the - * existence of temporary files. It then starts another TaskManager, which is - * guaranteed to remain empty (all tasks are already deployed) and kills one of - * the original task managers. The recovery should restart the tasks on the new TaskManager. + * Test for streaming program behaviour in case of taskmanager failure + * based on {@link ProcessFailureRecoveryTestBase}. */ @SuppressWarnings("serial") -@RunWith(Parameterized.class) -public class ProcessFailureBatchRecoveryITCase { - - private static final String READY_MARKER_FILE_PREFIX = "ready_"; - private static final String PROCEED_MARKER_FILE = "proceed"; - - private static final int PARALLELISM = 4; +public class ProcessFailureBatchRecoveryITCase extends ProcessFailureStreamingRecoveryITCase { private ExecutionMode executionMode; @@ -99,91 +48,19 @@ public class ProcessFailureBatchRecoveryITCase { {ExecutionMode.BATCH}}); } - @Test - public void testTaskManagerProcessFailure() { - - final StringWriter processOutput1 = new StringWriter(); - final StringWriter processOutput2 = new StringWriter(); - final StringWriter processOutput3 = new StringWriter(); - - ActorSystem jmActorSystem = null; - Process taskManagerProcess1 = null; - Process taskManagerProcess2 = null; - Process taskManagerProcess3 = null; - - File coordinateTempDir = null; - - try { - // check that we run this test only if the java command - // is available on this machine - String javaCommand = getJavaCommandPath(); - if (javaCommand == null) { - System.out.println("---- Skipping ProcessFailureBatchRecoveryITCase : Could not find java executable"); - return; - } - - // create a logging file for the process - File tempLogFile = File.createTempFile(getClass().getSimpleName() + "-", "-log4j.properties"); - tempLogFile.deleteOnExit(); - CommonTestUtils.printLog4jDebugConfig(tempLogFile); - - // coordination between the processes goes through a directory - coordinateTempDir = createTempDirectory(); - - // find a free port to start the JobManager - final int jobManagerPort = NetUtils.getAvailablePort(); - - // start a JobManager - Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort); + @Override + public Thread testProgram(int jobManagerPort, final File coordinateDirClosure, final Throwable[] errorRef) { - Configuration jmConfig = new Configuration(); - jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms"); - jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s"); - jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2); - jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "4 s"); + ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort); + env.setDegreeOfParallelism(PARALLELISM); + env.setNumberOfExecutionRetries(1); - jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress)); - ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem)._1(); - - // the TaskManager java command - String[] command = new String[] { - javaCommand, - "-Dlog.level=DEBUG", - "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(), - "-Xms80m", "-Xmx80m", - "-classpath", getCurrentClasspath(), - TaskManagerProcessEntryPoint.class.getName(), - String.valueOf(jobManagerPort) - }; - - // start the first two TaskManager processes - taskManagerProcess1 = new ProcessBuilder(command).start(); - new PipeForwarder(taskManagerProcess1.getErrorStream(), processOutput1); - taskManagerProcess2 = new ProcessBuilder(command).start(); - new PipeForwarder(taskManagerProcess2.getErrorStream(), processOutput2); - - // we wait for the JobManager to have the two TaskManagers available - // wait for at most 20 seconds - waitUntilNumTaskManagersAreRegistered(jmActor, 2, 20000); - - // the program will set a marker file in each of its parallel tasks once they are ready, so that - // this coordinating code is aware of this. - // the program will very slowly consume elements until the marker file (later created by the - // test driver code) is present - final File coordinateDirClosure = coordinateTempDir; - final Throwable[] errorRef = new Throwable[1]; - - ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort); - env.setParallelism(PARALLELISM); - env.setNumberOfExecutionRetries(1); - env.getConfig().setExecutionMode(executionMode); - - final long NUM_ELEMENTS = 100000L; - final DataSet<Long> result = env.generateSequence(1, NUM_ELEMENTS) + final long NUM_ELEMENTS = 100000L; + final DataSet<Long> result = env.generateSequence(1, NUM_ELEMENTS) // make sure every mapper is involved (no one is skipped because of lazy split assignment) .rebalance() - // the majority of the behavior is in the MapFunction + // the majority of the behavior is in the MapFunction .map(new RichMapFunction<Long, Long>() { private final File proceedFile = new File(coordinateDirClosure, PROCEED_MARKER_FILE); @@ -203,8 +80,7 @@ public class ProcessFailureBatchRecoveryITCase { if (checkForProceedFile) { if (proceedFile.exists()) { checkForProceedFile = false; - } - else { + } else { // otherwise wait so that we make slow progress Thread.sleep(100); } @@ -219,259 +95,24 @@ public class ProcessFailureBatchRecoveryITCase { } }); - // we trigger a program now (in a separate thread) - Thread programTrigger = new Thread("ProcessFailureBatchRecoveryITCase Program Trigger") { - @Override - public void run() { - try { - long sum = result.collect().get(0); - assertEquals(NUM_ELEMENTS * (NUM_ELEMENTS + 1L) / 2L, sum); - } - catch (Throwable t) { - t.printStackTrace(); - errorRef[0] = t; - } - } - }; - programTrigger.start(); - - // wait until all marker files are in place, indicating that all tasks have started - // max 20 seconds - waitForMarkerFiles(coordinateTempDir, PARALLELISM, 20000); - - // start the third TaskManager - taskManagerProcess3 = new ProcessBuilder(command).start(); - new PipeForwarder(taskManagerProcess3.getErrorStream(), processOutput3); - - // we wait for the third TaskManager to register (20 seconds max) - waitUntilNumTaskManagersAreRegistered(jmActor, 3, 20000); - - // kill one of the previous TaskManagers, triggering a failure and recovery - taskManagerProcess1.destroy(); - taskManagerProcess1 = null; - - // we create the marker file which signals the program functions tasks that they can complete - touchFile(new File(coordinateTempDir, PROCEED_MARKER_FILE)); - - // wait for at most 2 minutes for the program to complete - programTrigger.join(120000); - - // check that the program really finished - assertFalse("The program did not finish in time", programTrigger.isAlive()); - - // check whether the program encountered an error - if (errorRef[0] != null) { - Throwable error = errorRef[0]; - error.printStackTrace(); - fail("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage()); - } - - // all seems well :-) - } - catch (Exception e) { - e.printStackTrace(); - printProcessLog("TaskManager 1", processOutput1.toString()); - printProcessLog("TaskManager 2", processOutput2.toString()); - printProcessLog("TaskManager 3", processOutput3.toString()); - fail(e.getMessage()); - } - catch (Error e) { - e.printStackTrace(); - printProcessLog("TaskManager 1", processOutput1.toString()); - printProcessLog("TaskManager 2", processOutput2.toString()); - printProcessLog("TaskManager 3", processOutput3.toString()); - throw e; - } - finally { - if (taskManagerProcess1 != null) { - taskManagerProcess1.destroy(); - } - if (taskManagerProcess2 != null) { - taskManagerProcess2.destroy(); - } - if (taskManagerProcess3 != null) { - taskManagerProcess3.destroy(); - } - if (jmActorSystem != null) { - jmActorSystem.shutdown(); - } - if (coordinateTempDir != null) { + // we trigger program execution in a separate thread + return new Thread("ProcessFailureBatchRecoveryITCase Program Trigger") { + @Override + public void run() { try { - FileUtils.deleteDirectory(coordinateTempDir); - } - catch (Throwable t) { - // we can ignore this + long sum = result.collect().get(0); + assertEquals(NUM_ELEMENTS * (NUM_ELEMENTS + 1L) / 2L, sum); + } catch (Throwable t) { + t.printStackTrace(); + errorRef[0] = t; } } - } + }; } - private void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay) - throws Exception - { - final long deadline = System.currentTimeMillis() + maxDelay; - while (true) { - long remaining = deadline - System.currentTimeMillis(); - if (remaining <= 0) { - fail("The TaskManagers did not register within the expected time (" + maxDelay + "msecs)"); - } - - FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.MILLISECONDS); - - try { - Future<?> result = Patterns.ask(jobManager, - JobManagerMessages.getRequestNumberRegisteredTaskManager(), - new Timeout(timeout)); - Integer numTMs = (Integer) Await.result(result, timeout); - if (numTMs == numExpected) { - break; - } - } - catch (TimeoutException e) { - // ignore and retry - } - catch (ClassCastException e) { - fail("Wrong response: " + e.getMessage()); - } - } + @Override + public void postSubmit() throws Exception, Error { + // unnecessary } - private static void printProcessLog(String processName, String log) { - if (log == null || log.length() == 0) { - return; - } - - System.out.println("-----------------------------------------"); - System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + processName); - System.out.println("-----------------------------------------"); - System.out.println(log); - System.out.println("-----------------------------------------"); - System.out.println(" END SPAWNED PROCESS LOG"); - System.out.println("-----------------------------------------"); - } - - private static File createTempDirectory() throws IOException { - File tempDir = new File(System.getProperty("java.io.tmpdir")); - - for (int i = 0; i < 10; i++) { - File dir = new File(tempDir, UUID.randomUUID().toString()); - if (!dir.exists() && dir.mkdirs()) { - return dir; - } - System.err.println("Could not use temporary directory " + dir.getAbsolutePath()); - } - - throw new IOException("Could not create temporary file directory"); - } - - private static void touchFile(File file) throws IOException { - if (!file.exists()) { - new FileOutputStream(file).close(); - } - if (!file.setLastModified(System.currentTimeMillis())) { - throw new IOException("Could not touch the file."); - } - } - - private static void waitForMarkerFiles(File basedir, int num, long timeout) { - long now = System.currentTimeMillis(); - final long deadline = now + timeout; - - - while (now < deadline) { - boolean allFound = true; - - for (int i = 0; i < num; i++) { - File nextToCheck = new File(basedir, READY_MARKER_FILE_PREFIX + i); - if (!nextToCheck.exists()) { - allFound = false; - break; - } - } - - if (allFound) { - return; - } - else { - // not all found, wait for a bit - try { - Thread.sleep(10); - } - catch (InterruptedException e) { - throw new RuntimeException(e); - } - - now = System.currentTimeMillis(); - } - } - - fail("The tasks were not started within time (" + timeout + "msecs)"); - } - - // -------------------------------------------------------------------------------------------- - - /** - * The entry point for the TaskManager JVM. Simply configures and runs a TaskManager. - */ - public static class TaskManagerProcessEntryPoint { - - private static final Logger LOG = LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class); - - public static void main(String[] args) { - try { - int jobManagerPort = Integer.parseInt(args[0]); - - Configuration cfg = new Configuration(); - cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); - cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort); - cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4); - cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 256); - cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); - - TaskManager.runTaskManager(cfg, TaskManager.class); - - // wait forever - Object lock = new Object(); - synchronized (lock) { - lock.wait(); - } - } - catch (Throwable t) { - LOG.error("Failed to start TaskManager process", t); - System.exit(1); - } - } - } - - /** - * Utility class to read the output of a process stream and forward it into a StringWriter. - */ - private static class PipeForwarder extends Thread { - - private final StringWriter target; - private final InputStream source; - - public PipeForwarder(InputStream source, StringWriter target) { - super("Pipe Forwarder"); - setDaemon(true); - - this.source = source; - this.target = target; - - start(); - } - - @Override - public void run() { - try { - int next; - while ((next = source.read()) != -1) { - target.write(next); - } - } - catch (IOException e) { - // terminate - } - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/56afefc1/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java new file mode 100644 index 0000000..1933766 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.recovery; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.OperatorState; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.function.sink.RichSinkFunction; +import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext; +import org.apache.flink.test.util.ProcessFailureRecoveryTestBase; +import org.apache.flink.util.Collector; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; + +/** + * Test for streaming program behaviour in case of taskmanager failure + * based on {@link ProcessFailureRecoveryTestBase}. + */ +@SuppressWarnings("serial") +public class ProcessFailureStreamingRecoveryITCase extends ProcessFailureRecoveryTestBase { + + private static final String RESULT_PATH = "tempTestOutput"; + private static final int DATA_COUNT = 252; + + @Override + public Thread testProgram(int jobManagerPort, final File coordinateDirClosure, final Throwable[] errorRef) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort); + env.setDegreeOfParallelism(PARALLELISM); + env.setNumberOfExecutionRetries(1); + env.enableMonitoring(100); + + final DataStream<Long> result = env.addSource(new SleepyDurableGenerateSequence(DATA_COUNT)) + + // make sure every mapper is involved + .shuffle() + + // populate the coordinate directory so we can proceed to taskmanager failure + .map(new RichMapFunction<Long, Long>() { + + private boolean markerCreated = false; + + @Override + public void open(Configuration parameters) throws IOException { + + if (!markerCreated) { + int taskIndex = getRuntimeContext().getIndexOfThisSubtask(); + try { + touchFile(new File(coordinateDirClosure, READY_MARKER_FILE_PREFIX + taskIndex)); + } catch (IOException e) { + e.printStackTrace(); + } + markerCreated = true; + } + } + + @Override + public Long map(Long value) throws Exception { + return value; + } + }); + + //write result to temporary file + result.addSink(new RichSinkFunction<Long>() { + + private transient File output; + private transient int outputIndex; + private transient BufferedWriter writer; + + @Override + public void open(Configuration parameters) throws IOException { + outputIndex = 0; + do { + output = new File(RESULT_PATH + "-" + outputIndex); + outputIndex++; + } while (output.exists()); + + writer = new BufferedWriter(new FileWriter(output)); + } + + @Override + public void invoke(Long value) throws Exception { + writer.write(value.toString()); + writer.newLine(); + } + + @Override + public void close(){ + try { + writer.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void cancel() { + close(); + } + + }); + + // we trigger program execution in a separate thread + return new ProgramTrigger(env, errorRef); + } + + @Override + public void postSubmit() throws Exception, Error { + + // checks at least once processing guarantee of the output stream + fileBatchHasEveryNumberLower(DATA_COUNT, RESULT_PATH); + } + + public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction<Long> { + + private static final long SLEEP_TIME = 10; + + private long end; + private long collected = 0L; + private long toCollect; + private long coungrence; + private long stepSize; + private transient OperatorState<Long> collectedState; + + public SleepyDurableGenerateSequence(long end){ + this.end = end; + } + + public void open(Configuration parameters) throws Exception { + + StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); + + if (context.containsState("collected")) { + collectedState = (OperatorState<Long>) context.getState("collected"); + collected = collectedState.getState(); + } else { + collectedState = new OperatorState<Long>(collected); + context.registerState("collected", collectedState); + } + super.open(parameters); + + stepSize = context.getNumberOfParallelSubtasks(); + coungrence = context.getIndexOfThisSubtask(); + toCollect = (end % stepSize > coungrence) ? (end / stepSize + 1) : (end / stepSize); + } + + @Override + public void run(Collector<Long> collector) throws Exception { + while (collected < toCollect){ + collector.collect(collected * stepSize + coungrence); + collectedState.update(collected); + collected++; + Thread.sleep(SLEEP_TIME); + } + + } + + @Override + public void cancel() { + } + } + + public class ProgramTrigger extends Thread { + + StreamExecutionEnvironment env; + Throwable[] errorRef; + + ProgramTrigger(StreamExecutionEnvironment env, Throwable[] errorRef){ + super("ProcessFailureStreamingRecoveryITCase Program Trigger"); + this.env = env; + this.errorRef = errorRef; + } + + @Override + public void run() { + try { + env.execute(); + } + catch (Throwable t) { + t.printStackTrace(); + errorRef[0] = t; + } + } + + } +}