[tests] Add process reaping test for TaskManager, improves process reaping test for JobManager.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1230bcaa Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1230bcaa Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1230bcaa Branch: refs/heads/master Commit: 1230bcaa0f531f6260f291dd066f64fe52cc6708 Parents: 70df028 Author: Stephan Ewen <se...@apache.org> Authored: Tue Feb 24 12:23:59 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Feb 24 12:23:59 2015 +0100 ---------------------------------------------------------------------- .../flink/runtime/jobmanager/JobManager.scala | 94 ++++---- .../flink/runtime/taskmanager/TaskManager.scala | 68 +++++- .../JobManagerProcessReapingTest.java | 83 ++++++- .../TaskManagerProcessReapingTest.java | 236 +++++++++++++++++++ .../runtime/testutils/CommonTestUtils.java | 24 ++ 5 files changed, 447 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1230bcaa/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index a1642b4..2671f2d 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -94,17 +94,18 @@ class JobManager(val configuration: Configuration, val profiler: Option[ActorRef], val defaultExecutionRetries: Int, val delayBetweenRetries: Long, - implicit val timeout: FiniteDuration) + val timeout: FiniteDuration) extends Actor with ActorLogMessages with ActorLogging { - import context._ - - val LOG = JobManager.LOG - - // List of current jobs running - val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]() + /** Reference to the log, for debugging */ + protected val LOG = JobManager.LOG + /** List of current jobs running jobs */ + protected val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]() + /** + * Run when the job manager is started. Simply logs an informational message. + */ override def preStart(): Unit = { LOG.info(s"Starting JobManager at ${self.path}.") } @@ -138,6 +139,11 @@ class JobManager(val configuration: Configuration, } } + /** + * Central work method of the JobManager actor. Receives messages and reacts to them. + * + * @return + */ override def receiveWithLogMessages: Receive = { case RegisterTaskManager(connectionInfo, hardwareInformation, numberOfSlots) => @@ -182,7 +188,7 @@ class JobManager(val configuration: Configuration, // execute the cancellation asynchronously Future { executionGraph.cancel() - } + }(context.dispatcher) sender ! CancellationSuccess(jobID) case None => @@ -198,10 +204,12 @@ class JobManager(val configuration: Configuration, currentJobs.get(taskExecutionState.getJobID) match { case Some((executionGraph, _)) => val originalSender = sender + Future { val result = executionGraph.updateState(taskExecutionState) originalSender ! result - } + }(context.dispatcher) + sender ! true case None => log.error("Cannot find execution graph for ID {} to change state to {}.", taskExecutionState.getJobID, taskExecutionState.getExecutionState) @@ -603,6 +611,7 @@ object JobManager { EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager") checkJavaVersion() + // parsing the command line arguments val (configuration: Configuration, executionMode: ExecutionMode, listeningHost: String, listeningPort: Int) = @@ -617,16 +626,17 @@ object JobManager { } } - // we may want to check that the JobManager hostname is in the config + // we want to check that the JobManager hostname is in the config // if it is not in there, the actor system will bind to the loopback interface's // address and will not be reachable from anyone remote if (listeningHost == null) { val message = "Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY + - "' is missing (hostname or address to bind JobManager to)." + "' is missing (hostname/address to bind JobManager to)." LOG.error(message) System.exit(STARTUP_FAILURE_RETURN_CODE) } + // run the job manager try { if (SecurityUtils.isSecurityEnabled) { LOG.info("Security is enabled. Starting secure JobManager.") @@ -647,13 +657,28 @@ object JobManager { } } - + /** + * Starts and runs the JobManager with all its components. First, this method starts a + * dedicated actor system for the JobManager. Second, its starts all components of the + * JobManager (including library cache, instance manager, scheduler). Finally, it starts + * the JobManager actor itself. + * + * This method blocks indefinitely (or until the JobManager's actor system is shut down). + * + * @param configuration The configuration object for the JobManager. + * @param executionMode The execution mode in which to run. Execution mode LOCAL will spawn an + * an additional TaskManager in the same process. + * @param listeningAddress The hostname where the JobManager should listen for messages. + * @param listeningPort The port where the JobManager should listen for messages. + */ def runJobManager(configuration: Configuration, executionMode: ExecutionMode, listeningAddress: String, listeningPort: Int) : Unit = { LOG.info("Starting JobManager") + + // Bring up the job manager actor system first, bind it to the given address. LOG.debug("Starting JobManager actor system") val jobManagerSystem = try { @@ -674,18 +699,18 @@ object JobManager { } try { - LOG.debug("Starting JobManager actor") - // bring up the job manager actor + LOG.debug("Starting JobManager actor") val (jobManager, archiver) = startJobManagerActors(configuration, jobManagerSystem) - // start a process reaper that watches the JobManager + // start a process reaper that watches the JobManager. If the JobManager actor dies, + // the process reaper will kill the JVM process (to ensure easy failure detection) jobManagerSystem.actorOf( Props(classOf[ProcessReaper], jobManager, LOG, RUNTIME_FAILURE_RETURN_CODE), "JobManager_Process_Reaper") // bring up a local task manager, if needed - if(executionMode.equals(LOCAL)){ + if (executionMode.equals(LOCAL)) { LOG.info("Starting embedded TaskManager for JobManager's LOCAL mode execution") TaskManager.startActorWithConfiguration("", TaskManager.TASK_MANAGER_NAME, configuration, @@ -757,15 +782,22 @@ object JobManager { } /** - * Extracts the job manager configuration values from a configuration instance. + * Create the job manager components as (instanceManager, scheduler, libraryCacheManager, + * archiverProps, accumulatorManager, profiler, defaultExecutionRetries, + * delayBetweenRetries, timeout) * - * @param configuration Object with the user provided configuration values - * @return Tuple of (number of archived jobs, profiling enabled, cleanup interval of the library - * cache manager, default number of execution retries, delay between retries) + * @param configuration The configuration from which to parse the config values. + * @return The members for a default JobManager. */ - def parseConfiguration(configuration: Configuration): (Int, Boolean, Long, Int, Long) = { + def createJobManagerComponents(configuration: Configuration) : + (InstanceManager, FlinkScheduler, BlobLibraryCacheManager, + Props, AccumulatorManager, Option[Props], Int, Long, FiniteDuration, Int) = { + + val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration) + val archiveCount = configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT) + val profilingEnabled = configuration.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, false) val cleanupInterval = configuration.getLong( @@ -780,26 +812,6 @@ object JobManager { ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)).toMillis - (archiveCount, profilingEnabled, cleanupInterval, executionRetries, delayBetweenRetries) - } - - /** - * Create the job manager members as (instanceManager, scheduler, libraryCacheManager, - * archiverProps, accumulatorManager, profiler, defaultExecutionRetries, - * delayBetweenRetries, timeout) - * - * @param configuration The configuration from which to parse the config values. - * @return The members for a default JobManager. - */ - def createJobManagerComponents(configuration: Configuration) : - (InstanceManager, FlinkScheduler, BlobLibraryCacheManager, - Props, AccumulatorManager, Option[Props], Int, Long, FiniteDuration, Int) = { - - val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration) - - val (archiveCount, profilingEnabled, cleanupInterval, executionRetries, delayBetweenRetries) = - parseConfiguration(configuration) - val archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount) val profilerProps: Option[Props] = if (profilingEnabled) { http://git-wip-us.apache.org/repos/asf/flink/blob/1230bcaa/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 9095eae..52a16bd 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -53,6 +53,7 @@ import org.apache.flink.runtime.messages.TaskManagerMessages._ import org.apache.flink.runtime.messages.TaskManagerProfilerMessages .{UnregisterProfilingListener, UnmonitorTask, MonitorTask, RegisterProfilingListener} import org.apache.flink.runtime.net.NetUtils +import org.apache.flink.runtime.process.ProcessReaper import org.apache.flink.runtime.profiling.ProfilingUtils import org.apache.flink.runtime.security.SecurityUtils import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner @@ -60,7 +61,7 @@ import org.apache.flink.runtime.util.EnvironmentInformation import org.apache.flink.util.ExceptionUtils import org.slf4j.LoggerFactory -import scala.concurrent.Future +import scala.concurrent._ import scala.concurrent.duration._ import scala.util.{Failure, Success} import scala.language.postfixOps @@ -721,7 +722,9 @@ import scala.collection.JavaConverters._ object TaskManager { val LOG = LoggerFactory.getLogger(classOf[TaskManager]) - val FAILURE_RETURN_CODE = -1 + + val STARTUP_FAILURE_RETURN_CODE = 1 + val RUNTIME_FAILURE_RETURN_CODE = 2 val TASK_MANAGER_NAME = "taskmanager" val PROFILER_NAME = "profiler" @@ -750,9 +753,16 @@ object TaskManager { def startActor(hostname: String, port: Int, configuration: Configuration, taskManagerName: String) : Unit = { - val (taskManagerSystem, _) = startActorSystemAndActor(hostname, port, configuration, + val (taskManagerSystem, taskManager) = startActorSystemAndActor(hostname, port, configuration, taskManagerName, localAkkaCommunication = false, localTaskManagerCommunication = false) + // start a process reaper that watches the JobManager. If the JobManager actor dies, + // the process reaper will kill the JVM process (to ensure easy failure detection) + taskManagerSystem.actorOf( + Props(classOf[ProcessReaper], taskManager, LOG, RUNTIME_FAILURE_RETURN_CODE), + "TaskManager_Process_Reaper") + + // block until everything is done taskManagerSystem.awaitTermination() } @@ -804,7 +814,7 @@ object TaskManager { } getOrElse { LOG.error(s"TaskManager parseArgs called with ${args.mkString(" ")}.") LOG.error("CLI parsing failed. Usage: " + parser.usage) - sys.exit(FAILURE_RETURN_CODE) + sys.exit(STARTUP_FAILURE_RETURN_CODE) } } @@ -979,7 +989,43 @@ object TaskManager { system.actorOf(Props(classOf[TaskManagerProfiler], instancePath, reportInterval), PROFILER_NAME) } - def checkTempDirs(tmpDirs: Array[String]): Unit = { + // -------------------------------------------------------------------------- + // Resolving the TaskManager actor + // -------------------------------------------------------------------------- + + /** + * Resolves the TaskManager actor reference in a blocking fashion. + * + * @param taskManagerUrl The akka URL of the JobManager. + * @param system The local actor system that should perform the lookup. + * @param timeout The maximum time to wait until the lookup fails. + * @throws java.io.IOException Thrown, if the lookup fails. + * @return The ActorRef to the TaskManager + */ + @throws(classOf[IOException]) + def getTaskManagerRemoteReference(taskManagerUrl: String, + system: ActorSystem, + timeout: FiniteDuration): ActorRef = { + try { + val future = AkkaUtils.getReference(taskManagerUrl, system, timeout) + Await.result(future, timeout) + } + catch { + case e @ (_ : ActorNotFound | _ : TimeoutException) => + throw new IOException( + s"TaskManager at $taskManagerUrl not reachable. " + + s"Please make sure that the TaskManager is running and its port is reachable.", e) + + case e: IOException => + throw new IOException("Could not connect to TaskManager at " + taskManagerUrl, e) + } + } + + // -------------------------------------------------------------------------- + // Miscellaneous Utilities + // -------------------------------------------------------------------------- + + private def checkTempDirs(tmpDirs: Array[String]): Unit = { tmpDirs.zipWithIndex.foreach { case (dir: String, _) => val file = new File(dir) @@ -1010,6 +1056,12 @@ object TaskManager { } } + /** + * Gets the memory footprint of the JVM in a string representation. + * + * @param memoryMXBean The memory management bean used to access the memory statistics. + * @return A string describing how much heap memory and direct memory are allocated and used. + */ private def getMemoryUsageStatsAsString(memoryMXBean: MemoryMXBean): String = { val heap = memoryMXBean.getHeapMemoryUsage val nonHeap = memoryMXBean.getNonHeapMemoryUsage @@ -1026,6 +1078,12 @@ object TaskManager { s"NON HEAP: $nonHeapUsed/$nonHeapCommitted/$nonHeapMax MB (used/committed/max)]" } + /** + * Gets the garbage collection statistics from the JVM. + * + * @param gcMXBeans The collection of garbage collector beans. + * @return A string denoting the number of times and total elapsed time in garbage collection. + */ private def getGarbageCollectorStatsAsString(gcMXBeans: Iterable[GarbageCollectorMXBean]) : String = { val beans = gcMXBeans map { http://git-wip-us.apache.org/repos/asf/flink/blob/1230bcaa/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java index 74ed938..ae3a771 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java @@ -27,14 +27,19 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.net.NetUtils; +import org.apache.flink.runtime.testutils.CommonTestUtils; import org.junit.Test; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import scala.Some; import scala.Tuple2; import scala.concurrent.duration.FiniteDuration; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; import java.net.InetSocketAddress; import java.util.concurrent.TimeUnit; @@ -43,33 +48,45 @@ import java.util.concurrent.TimeUnit; */ public class JobManagerProcessReapingTest { - private static final int JOB_MANAGER_PORT = 56532; - @Test public void testReapProcessOnFailure() { Process jmProcess = null; ActorSystem localSystem = null; + final StringWriter processOutput = new StringWriter(); + try { String javaCommand = getJavaCommandPath(); // check that we run this test only if the java command // is available on this machine if (javaCommand == null) { + System.out.println("---- Skipping JobManagerProcessReapingTest : Could not find java executable ----"); return; } + // create a logging file for the process + File tempLogFile = File.createTempFile("testlogconfig", "properties"); + tempLogFile.deleteOnExit(); + CommonTestUtils.printLog4jDebugConfig(tempLogFile); + + int jobManagerPort = NetUtils.getAvailablePort(); + // start a JobManger process String[] command = new String[] { javaCommand, - "-Dlog.level=OFF", + "-Dlog.level=DEBUG", + "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(), "-Xms256m", "-Xmx256m", "-classpath", getCurrentClasspath(), - JobManagerTestEntryPoint.class.getName() + JobManagerTestEntryPoint.class.getName(), + String.valueOf(jobManagerPort) }; + // spawn the process and collect its output ProcessBuilder bld = new ProcessBuilder(command); jmProcess = bld.start(); + new PipeForwarder(jmProcess.getErrorStream(), processOutput); // start another actor system so we can send something to the JobManager Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", 0); @@ -82,8 +99,8 @@ public class JobManagerProcessReapingTest { for (int i = 0; i < 20; i++) { try { jobManagerRef = JobManager.getJobManagerRemoteReference( - new InetSocketAddress("localhost", JOB_MANAGER_PORT), - localSystem, new FiniteDuration(20, TimeUnit.SECONDS)); + new InetSocketAddress("localhost", jobManagerPort), + localSystem, new FiniteDuration(5, TimeUnit.SECONDS)); break; } catch (Throwable t) { @@ -117,8 +134,14 @@ public class JobManagerProcessReapingTest { } catch (Exception e) { e.printStackTrace(); + printProcessLog(processOutput.toString()); fail(e.getMessage()); } + catch (Error e) { + e.printStackTrace(); + printProcessLog(processOutput.toString()); + throw e; + } finally { if (jmProcess != null) { jmProcess.destroy(); @@ -129,17 +152,24 @@ public class JobManagerProcessReapingTest { } } + private static void printProcessLog(String log) { + System.out.println("-----------------------------------------"); + System.out.println(" BEGIN SPAWNED PROCESS LOG"); + System.out.println("-----------------------------------------"); + System.out.println(log); + System.out.println("-----------------------------------------"); + System.out.println(" END SPAWNED PROCESS LOG"); + System.out.println("-----------------------------------------"); + } + // -------------------------------------------------------------------------------------------- public static class JobManagerTestEntryPoint { public static void main(String[] args) { try { - Configuration cfg = new Configuration(); - cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); - cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, JOB_MANAGER_PORT); - - JobManager.runJobManager(cfg, ExecutionMode.CLUSTER(), "localhost", JOB_MANAGER_PORT); + int port = Integer.parseInt(args[0]); + JobManager.runJobManager(new Configuration(), ExecutionMode.CLUSTER(), "localhost", port); System.exit(0); } catch (Throwable t) { @@ -147,4 +177,33 @@ public class JobManagerProcessReapingTest { } } } + + private static class PipeForwarder extends Thread { + + private final StringWriter target; + private final InputStream source; + + public PipeForwarder(InputStream source, StringWriter target) { + super("Pipe Forwarder"); + setDaemon(true); + + this.source = source; + this.target = target; + + start(); + } + + @Override + public void run() { + try { + int next; + while ((next = source.read()) != -1) { + target.write(next); + } + } + catch (IOException e) { + // terminate + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/1230bcaa/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java new file mode 100644 index 0000000..b92cfd4 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; + +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.net.NetUtils; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.junit.Test; + +import scala.Some; +import scala.Tuple2; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath; +import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath; +import static org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive; + +/** + * Tests that the JobManager process properly exits when the JobManager actor dies. + */ +public class TaskManagerProcessReapingTest { + + private static final String TASK_MANAGER_ACTOR_NAME = "TEST_TM"; + + @Test + public void testReapProcessOnFailure() { + Process taskManagerProcess = null; + ActorSystem jmActorSystem = null; + + final StringWriter processOutput = new StringWriter(); + + try { + String javaCommand = getJavaCommandPath(); + + // check that we run this test only if the java command + // is available on this machine + if (javaCommand == null) { + System.out.println("---- Skipping TaskManagerProcessReapingTest : Could not find java executable ----"); + return; + } + + // create a logging file for the process + File tempLogFile = File.createTempFile("testlogconfig", "properties"); + tempLogFile.deleteOnExit(); + CommonTestUtils.printLog4jDebugConfig(tempLogFile); + + final int jobManagerPort = NetUtils.getAvailablePort(); + + // start a JobManager + Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort); + jmActorSystem = AkkaUtils.createActorSystem( + new Configuration(), new Some<Tuple2<String, Object>>(localAddress)); + + JobManager.startJobManagerActors(new Configuration(), jmActorSystem); + + final int taskManagerPort = NetUtils.getAvailablePort(); + + // start the task manager process + String[] command = new String[] { + javaCommand, + "-Dlog.level=DEBUG", + "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(), + "-Xms256m", "-Xmx256m", + "-classpath", getCurrentClasspath(), + TaskManagerTestEntryPoint.class.getName(), + String.valueOf(jobManagerPort), String.valueOf(taskManagerPort) + }; + + ProcessBuilder bld = new ProcessBuilder(command); + taskManagerProcess = bld.start(); + new PipeForwarder(taskManagerProcess.getErrorStream(), processOutput); + + // grab the reference to the TaskManager. try multiple times, until the process + // is started and the TaskManager is up + String taskManagerActorName = String.format("akka.tcp://flink@%s:%d/user/%s", + "127.0.0.1", taskManagerPort, TASK_MANAGER_ACTOR_NAME); + + ActorRef taskManagerRef = null; + for (int i = 0; i < 20; i++) { + try { + taskManagerRef = TaskManager.getTaskManagerRemoteReference( + taskManagerActorName, jmActorSystem, new FiniteDuration(5, TimeUnit.SECONDS)); + break; + } + catch (Throwable t) { + // TaskManager probably not ready yet + } + Thread.sleep(500); + } + + assertTrue("TaskManager process died", isProcessAlive(taskManagerProcess)); + assertTrue("TaskManager process did not launch the TaskManager properly", taskManagerRef != null); + + // kill the TaskManager actor + taskManagerRef.tell(PoisonPill.getInstance(), ActorRef.noSender()); + + // wait for max 5 seconds for the process to terminate + { + long now = System.currentTimeMillis(); + long deadline = now + 5000; + + while (now < deadline && isProcessAlive(taskManagerProcess)) { + Thread.sleep(100); + now = System.currentTimeMillis(); + } + } + + assertFalse("TaskManager process did not terminate upon actor death", isProcessAlive(taskManagerProcess)); + + int returnCode = taskManagerProcess.exitValue(); + assertEquals("TaskManager died, but not because of the process reaper", + TaskManager.RUNTIME_FAILURE_RETURN_CODE(), returnCode); + } + catch (Exception e) { + e.printStackTrace(); + printProcessLog(processOutput.toString()); + fail(e.getMessage()); + } + catch (Error e) { + e.printStackTrace(); + printProcessLog(processOutput.toString()); + throw e; + } + finally { + if (taskManagerProcess != null) { + taskManagerProcess.destroy(); + } + if (jmActorSystem != null) { + jmActorSystem.shutdown(); + } + } + } + + private static void printProcessLog(String log) { + System.out.println("-----------------------------------------"); + System.out.println(" BEGIN SPAWNED PROCESS LOG"); + System.out.println("-----------------------------------------"); + System.out.println(log); + System.out.println("-----------------------------------------"); + System.out.println(" END SPAWNED PROCESS LOG"); + System.out.println("-----------------------------------------"); + } + + // -------------------------------------------------------------------------------------------- + + public static class TaskManagerTestEntryPoint { + + public static void main(String[] args) { + try { + int jobManagerPort = Integer.parseInt(args[0]); + int taskManagerPort = Integer.parseInt(args[1]); + + Configuration cfg = new Configuration(); + cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); + cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort); + cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4); + cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100); + + TaskManager.startActor("localhost", taskManagerPort, cfg, TASK_MANAGER_ACTOR_NAME); + + // wait forever + Object lock = new Object(); + synchronized (lock) { + lock.wait(); + } + } + catch (Throwable t) { + System.exit(1); + } + } + } + + private static class PipeForwarder extends Thread { + + private final StringWriter target; + private final InputStream source; + + public PipeForwarder(InputStream source, StringWriter target) { + super("Pipe Forwarder"); + setDaemon(true); + + this.source = source; + this.target = target; + + start(); + } + + @Override + public void run() { + try { + int next; + while ((next = source.read()) != -1) { + target.write(next); + } + } + catch (IOException e) { + // terminate + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1230bcaa/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java index fc52417..ca05416 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java @@ -20,14 +20,18 @@ package org.apache.flink.runtime.testutils; import static org.junit.Assert.fail; +import java.io.BufferedWriter; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; +import java.io.FileOutputStream; +import java.io.FileWriter; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.io.PrintWriter; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; @@ -204,4 +208,24 @@ public class CommonTestUtils { return true; } } + + public static void printLog4jDebugConfig(File file) throws IOException { + FileWriter fw = new FileWriter(file); + try { + PrintWriter writer = new PrintWriter(fw); + + writer.println("log4j.rootLogger=DEBUG, console"); + writer.println("log4j.appender.console=org.apache.log4j.ConsoleAppender"); + writer.println("log4j.appender.console.target = System.err"); + writer.println("log4j.appender.console.layout=org.apache.log4j.PatternLayout"); + writer.println("log4j.appender.console.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n"); + writer.println("log4j.logger.org.eclipse.jetty.util.log=OFF"); + + writer.flush(); + writer.close(); + } + finally { + fw.close(); + } + } }