[tests] Add process reaping test for TaskManager, improves process reaping test 
for JobManager.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1230bcaa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1230bcaa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1230bcaa

Branch: refs/heads/master
Commit: 1230bcaa0f531f6260f291dd066f64fe52cc6708
Parents: 70df028
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Feb 24 12:23:59 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 24 12:23:59 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   |  94 ++++----
 .../flink/runtime/taskmanager/TaskManager.scala |  68 +++++-
 .../JobManagerProcessReapingTest.java           |  83 ++++++-
 .../TaskManagerProcessReapingTest.java          | 236 +++++++++++++++++++
 .../runtime/testutils/CommonTestUtils.java      |  24 ++
 5 files changed, 447 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1230bcaa/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index a1642b4..2671f2d 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -94,17 +94,18 @@ class JobManager(val configuration: Configuration,
                  val profiler: Option[ActorRef],
                  val defaultExecutionRetries: Int,
                  val delayBetweenRetries: Long,
-                 implicit val timeout: FiniteDuration)
+                 val timeout: FiniteDuration)
   extends Actor with ActorLogMessages with ActorLogging {
 
-  import context._
-
-  val LOG = JobManager.LOG
-
-  // List of current jobs running
-  val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, 
JobInfo)]()
+  /** Reference to the log, for debugging */
+  protected val LOG = JobManager.LOG
 
+  /** List of current jobs running jobs */
+  protected val currentJobs = scala.collection.mutable.HashMap[JobID, 
(ExecutionGraph, JobInfo)]()
 
+  /**
+   * Run when the job manager is started. Simply logs an informational message.
+   */
   override def preStart(): Unit = {
     LOG.info(s"Starting JobManager at ${self.path}.")
   }
@@ -138,6 +139,11 @@ class JobManager(val configuration: Configuration,
     }
   }
 
+  /**
+   * Central work method of the JobManager actor. Receives messages and reacts 
to them.
+   *
+   * @return
+   */
   override def receiveWithLogMessages: Receive = {
 
     case RegisterTaskManager(connectionInfo, hardwareInformation, 
numberOfSlots) =>
@@ -182,7 +188,7 @@ class JobManager(val configuration: Configuration,
           // execute the cancellation asynchronously
           Future {
             executionGraph.cancel()
-          }
+          }(context.dispatcher)
 
           sender ! CancellationSuccess(jobID)
         case None =>
@@ -198,10 +204,12 @@ class JobManager(val configuration: Configuration,
         currentJobs.get(taskExecutionState.getJobID) match {
           case Some((executionGraph, _)) =>
             val originalSender = sender
+
             Future {
               val result = executionGraph.updateState(taskExecutionState)
               originalSender ! result
-            }
+            }(context.dispatcher)
+
             sender ! true
           case None => log.error("Cannot find execution graph for ID {} to 
change state to {}.",
             taskExecutionState.getJobID, taskExecutionState.getExecutionState)
@@ -603,6 +611,7 @@ object JobManager {
     EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager")
     checkJavaVersion()
 
+    // parsing the command line arguments
     val (configuration: Configuration,
          executionMode: ExecutionMode,
          listeningHost: String, listeningPort: Int) =
@@ -617,16 +626,17 @@ object JobManager {
       }
     }
 
-    // we may want to check that the JobManager hostname is in the config
+    // we want to check that the JobManager hostname is in the config
     // if it is not in there, the actor system will bind to the loopback 
interface's
     // address and will not be reachable from anyone remote
     if (listeningHost == null) {
       val message = "Config parameter '" + 
ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY +
-        "' is missing (hostname or address to bind JobManager to)."
+        "' is missing (hostname/address to bind JobManager to)."
       LOG.error(message)
       System.exit(STARTUP_FAILURE_RETURN_CODE)
     }
 
+    // run the job manager
     try {
       if (SecurityUtils.isSecurityEnabled) {
         LOG.info("Security is enabled. Starting secure JobManager.")
@@ -647,13 +657,28 @@ object JobManager {
     }
   }
 
-
+  /**
+   * Starts and runs the JobManager with all its components. First, this 
method starts a
+   * dedicated actor system for the JobManager. Second, its starts all 
components of the
+   * JobManager (including library cache, instance manager, scheduler). 
Finally, it starts
+   * the JobManager actor itself.
+   *
+   * This method blocks indefinitely (or until the JobManager's actor system 
is shut down).
+   *
+   * @param configuration The configuration object for the JobManager.
+   * @param executionMode The execution mode in which to run. Execution mode 
LOCAL will spawn an
+   *                      an additional TaskManager in the same process.
+   * @param listeningAddress The hostname where the JobManager should listen 
for messages.
+   * @param listeningPort The port where the JobManager should listen for 
messages.
+   */
   def runJobManager(configuration: Configuration,
                     executionMode: ExecutionMode,
                     listeningAddress: String,
                     listeningPort: Int) : Unit = {
 
     LOG.info("Starting JobManager")
+
+    // Bring up the job manager actor system first, bind it to the given 
address.
     LOG.debug("Starting JobManager actor system")
 
     val jobManagerSystem = try {
@@ -674,18 +699,18 @@ object JobManager {
     }
 
     try {
-      LOG.debug("Starting JobManager actor")
-
       // bring up the job manager actor
+      LOG.debug("Starting JobManager actor")
       val (jobManager, archiver) = startJobManagerActors(configuration, 
jobManagerSystem)
 
-      // start a process reaper that watches the JobManager
+      // start a process reaper that watches the JobManager. If the JobManager 
actor dies,
+      // the process reaper will kill the JVM process (to ensure easy failure 
detection)
       jobManagerSystem.actorOf(
         Props(classOf[ProcessReaper], jobManager, LOG, 
RUNTIME_FAILURE_RETURN_CODE),
         "JobManager_Process_Reaper")
 
       // bring up a local task manager, if needed
-      if(executionMode.equals(LOCAL)){
+      if (executionMode.equals(LOCAL)) {
         LOG.info("Starting embedded TaskManager for JobManager's LOCAL mode 
execution")
 
         TaskManager.startActorWithConfiguration("", 
TaskManager.TASK_MANAGER_NAME, configuration,
@@ -757,15 +782,22 @@ object JobManager {
   }
 
   /**
-   * Extracts the job manager configuration values from a configuration 
instance.
+   * Create the job manager components as (instanceManager, scheduler, 
libraryCacheManager,
+   *              archiverProps, accumulatorManager, profiler, 
defaultExecutionRetries,
+   *              delayBetweenRetries, timeout)
    *
-   * @param configuration Object with the user provided configuration values
-   * @return Tuple of (number of archived jobs, profiling enabled, cleanup 
interval of the library
-   *         cache manager, default number of execution retries, delay between 
retries)
+   * @param configuration The configuration from which to parse the config 
values.
+   * @return The members for a default JobManager.
    */
-  def parseConfiguration(configuration: Configuration): (Int, Boolean, Long, 
Int, Long) = {
+  def createJobManagerComponents(configuration: Configuration) :
+    (InstanceManager, FlinkScheduler, BlobLibraryCacheManager,
+      Props, AccumulatorManager, Option[Props], Int, Long, FiniteDuration, 
Int) = {
+
+    val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
+
     val archiveCount = 
configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT,
       ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT)
+
     val profilingEnabled = 
configuration.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, false)
 
     val cleanupInterval = configuration.getLong(
@@ -780,26 +812,6 @@ object JobManager {
       ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE,
       ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)).toMillis
 
-    (archiveCount, profilingEnabled, cleanupInterval, executionRetries, 
delayBetweenRetries)
-  }
-
-  /**
-   * Create the job manager members as (instanceManager, scheduler, 
libraryCacheManager,
-   *              archiverProps, accumulatorManager, profiler, 
defaultExecutionRetries,
-   *              delayBetweenRetries, timeout)
-   *
-   * @param configuration The configuration from which to parse the config 
values.
-   * @return The members for a default JobManager.
-   */
-  def createJobManagerComponents(configuration: Configuration) :
-    (InstanceManager, FlinkScheduler, BlobLibraryCacheManager,
-      Props, AccumulatorManager, Option[Props], Int, Long, FiniteDuration, 
Int) = {
-
-    val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
-
-    val (archiveCount, profilingEnabled, cleanupInterval, executionRetries, 
delayBetweenRetries) =
-      parseConfiguration(configuration)
-
     val archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount)
 
     val profilerProps: Option[Props] = if (profilingEnabled) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1230bcaa/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 9095eae..52a16bd 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -53,6 +53,7 @@ import org.apache.flink.runtime.messages.TaskManagerMessages._
 import org.apache.flink.runtime.messages.TaskManagerProfilerMessages
 .{UnregisterProfilingListener, UnmonitorTask, MonitorTask, 
RegisterProfilingListener}
 import org.apache.flink.runtime.net.NetUtils
+import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.profiling.ProfilingUtils
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
@@ -60,7 +61,7 @@ import org.apache.flink.runtime.util.EnvironmentInformation
 import org.apache.flink.util.ExceptionUtils
 import org.slf4j.LoggerFactory
 
-import scala.concurrent.Future
+import scala.concurrent._
 import scala.concurrent.duration._
 import scala.util.{Failure, Success}
 import scala.language.postfixOps
@@ -721,7 +722,9 @@ import scala.collection.JavaConverters._
 object TaskManager {
 
   val LOG = LoggerFactory.getLogger(classOf[TaskManager])
-  val FAILURE_RETURN_CODE = -1
+
+  val STARTUP_FAILURE_RETURN_CODE = 1
+  val RUNTIME_FAILURE_RETURN_CODE = 2
 
   val TASK_MANAGER_NAME = "taskmanager"
   val PROFILER_NAME = "profiler"
@@ -750,9 +753,16 @@ object TaskManager {
   def startActor(hostname: String, port: Int, configuration: Configuration,
                  taskManagerName: String) : Unit = {
 
-    val (taskManagerSystem, _) = startActorSystemAndActor(hostname, port, 
configuration,
+    val (taskManagerSystem, taskManager) = startActorSystemAndActor(hostname, 
port, configuration,
       taskManagerName, localAkkaCommunication = false, 
localTaskManagerCommunication = false)
 
+    // start a process reaper that watches the JobManager. If the JobManager 
actor dies,
+    // the process reaper will kill the JVM process (to ensure easy failure 
detection)
+    taskManagerSystem.actorOf(
+      Props(classOf[ProcessReaper], taskManager, LOG, 
RUNTIME_FAILURE_RETURN_CODE),
+      "TaskManager_Process_Reaper")
+
+    // block until everything is done
     taskManagerSystem.awaitTermination()
   }
 
@@ -804,7 +814,7 @@ object TaskManager {
     } getOrElse {
       LOG.error(s"TaskManager parseArgs called with ${args.mkString(" ")}.")
       LOG.error("CLI parsing failed. Usage: " + parser.usage)
-      sys.exit(FAILURE_RETURN_CODE)
+      sys.exit(STARTUP_FAILURE_RETURN_CODE)
     }
   }
 
@@ -979,7 +989,43 @@ object TaskManager {
     system.actorOf(Props(classOf[TaskManagerProfiler], instancePath, 
reportInterval), PROFILER_NAME)
   }
 
-  def checkTempDirs(tmpDirs: Array[String]): Unit = {
+  // --------------------------------------------------------------------------
+  //  Resolving the TaskManager actor
+  // --------------------------------------------------------------------------
+
+  /**
+   * Resolves the TaskManager actor reference in a blocking fashion.
+   *
+   * @param taskManagerUrl The akka URL of the JobManager.
+   * @param system The local actor system that should perform the lookup.
+   * @param timeout The maximum time to wait until the lookup fails.
+   * @throws java.io.IOException Thrown, if the lookup fails.
+   * @return The ActorRef to the TaskManager
+   */
+  @throws(classOf[IOException])
+  def getTaskManagerRemoteReference(taskManagerUrl: String,
+                                   system: ActorSystem,
+                                   timeout: FiniteDuration): ActorRef = {
+    try {
+      val future = AkkaUtils.getReference(taskManagerUrl, system, timeout)
+      Await.result(future, timeout)
+    }
+    catch {
+      case e @ (_ : ActorNotFound | _ : TimeoutException) =>
+        throw new IOException(
+          s"TaskManager at $taskManagerUrl not reachable. " +
+            s"Please make sure that the TaskManager is running and its port is 
reachable.", e)
+
+      case e: IOException =>
+        throw new IOException("Could not connect to TaskManager at " + 
taskManagerUrl, e)
+    }
+  }
+
+  // --------------------------------------------------------------------------
+  //  Miscellaneous Utilities
+  // --------------------------------------------------------------------------
+
+  private def checkTempDirs(tmpDirs: Array[String]): Unit = {
     tmpDirs.zipWithIndex.foreach {
       case (dir: String, _) =>
         val file = new File(dir)
@@ -1010,6 +1056,12 @@ object TaskManager {
     }
   }
 
+  /**
+   * Gets the memory footprint of the JVM in a string representation.
+   *
+   * @param memoryMXBean The memory management bean used to access the memory 
statistics.
+   * @return A string describing how much heap memory and direct memory are 
allocated and used.
+   */
   private def getMemoryUsageStatsAsString(memoryMXBean: MemoryMXBean): String 
= {
     val heap = memoryMXBean.getHeapMemoryUsage
     val nonHeap = memoryMXBean.getNonHeapMemoryUsage
@@ -1026,6 +1078,12 @@ object TaskManager {
       s"NON HEAP: $nonHeapUsed/$nonHeapCommitted/$nonHeapMax MB 
(used/committed/max)]"
   }
 
+  /**
+   * Gets the garbage collection statistics from the JVM.
+   *
+   * @param gcMXBeans The collection of garbage collector beans.
+   * @return A string denoting the number of times and total elapsed time in 
garbage collection.
+   */
   private def getGarbageCollectorStatsAsString(gcMXBeans: 
Iterable[GarbageCollectorMXBean])
   : String = {
     val beans = gcMXBeans map {

http://git-wip-us.apache.org/repos/asf/flink/blob/1230bcaa/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
index 74ed938..ae3a771 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
@@ -27,14 +27,19 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.junit.Test;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import scala.Some;
 import scala.Tuple2;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
 import java.net.InetSocketAddress;
 import java.util.concurrent.TimeUnit;
 
@@ -43,33 +48,45 @@ import java.util.concurrent.TimeUnit;
  */
 public class JobManagerProcessReapingTest {
 
-       private static final int JOB_MANAGER_PORT = 56532;
-
        @Test
        public void testReapProcessOnFailure() {
                Process jmProcess = null;
                ActorSystem localSystem = null;
 
+               final StringWriter processOutput = new StringWriter();
+
                try {
                        String javaCommand = getJavaCommandPath();
 
                        // check that we run this test only if the java command
                        // is available on this machine
                        if (javaCommand == null) {
+                               System.out.println("---- Skipping 
JobManagerProcessReapingTest : Could not find java executable ----");
                                return;
                        }
 
+                       // create a logging file for the process
+                       File tempLogFile = File.createTempFile("testlogconfig", 
"properties");
+                       tempLogFile.deleteOnExit();
+                       CommonTestUtils.printLog4jDebugConfig(tempLogFile);
+
+                       int jobManagerPort = NetUtils.getAvailablePort();
+
                        // start a JobManger process
                        String[] command = new String[] {
                                        javaCommand,
-                                       "-Dlog.level=OFF",
+                                       "-Dlog.level=DEBUG",
+                                       "-Dlog4j.configuration=file:" + 
tempLogFile.getAbsolutePath(),
                                        "-Xms256m", "-Xmx256m",
                                        "-classpath", getCurrentClasspath(),
-                                       JobManagerTestEntryPoint.class.getName()
+                                       
JobManagerTestEntryPoint.class.getName(),
+                                       String.valueOf(jobManagerPort)
                        };
 
+                       // spawn the process and collect its output
                        ProcessBuilder bld = new ProcessBuilder(command);
                        jmProcess = bld.start();
+                       new PipeForwarder(jmProcess.getErrorStream(), 
processOutput);
 
                        // start another actor system so we can send something 
to the JobManager
                        Tuple2<String, Object> localAddress = new 
Tuple2<String, Object>("localhost", 0);
@@ -82,8 +99,8 @@ public class JobManagerProcessReapingTest {
                        for (int i = 0; i < 20; i++) {
                                try {
                                        jobManagerRef = 
JobManager.getJobManagerRemoteReference(
-                                                       new 
InetSocketAddress("localhost", JOB_MANAGER_PORT),
-                                                       localSystem, new 
FiniteDuration(20, TimeUnit.SECONDS));
+                                                       new 
InetSocketAddress("localhost", jobManagerPort),
+                                                       localSystem, new 
FiniteDuration(5, TimeUnit.SECONDS));
                                        break;
                                }
                                catch (Throwable t) {
@@ -117,8 +134,14 @@ public class JobManagerProcessReapingTest {
                }
                catch (Exception e) {
                        e.printStackTrace();
+                       printProcessLog(processOutput.toString());
                        fail(e.getMessage());
                }
+               catch (Error e) {
+                       e.printStackTrace();
+                       printProcessLog(processOutput.toString());
+                       throw e;
+               }
                finally {
                        if (jmProcess != null) {
                                jmProcess.destroy();
@@ -129,17 +152,24 @@ public class JobManagerProcessReapingTest {
                }
        }
 
+       private static void printProcessLog(String log) {
+               System.out.println("-----------------------------------------");
+               System.out.println("       BEGIN SPAWNED PROCESS LOG");
+               System.out.println("-----------------------------------------");
+               System.out.println(log);
+               System.out.println("-----------------------------------------");
+               System.out.println("        END SPAWNED PROCESS LOG");
+               System.out.println("-----------------------------------------");
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
        public static class JobManagerTestEntryPoint {
 
                public static void main(String[] args) {
                        try {
-                               Configuration cfg = new Configuration();
-                               
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
-                               
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, JOB_MANAGER_PORT);
-
-                               JobManager.runJobManager(cfg, 
ExecutionMode.CLUSTER(), "localhost", JOB_MANAGER_PORT);
+                               int port = Integer.parseInt(args[0]);
+                               JobManager.runJobManager(new Configuration(), 
ExecutionMode.CLUSTER(), "localhost", port);
                                System.exit(0);
                        }
                        catch (Throwable t) {
@@ -147,4 +177,33 @@ public class JobManagerProcessReapingTest {
                        }
                }
        }
+
+       private static class PipeForwarder extends Thread {
+
+               private final StringWriter target;
+               private final InputStream source;
+
+               public PipeForwarder(InputStream source, StringWriter target) {
+                       super("Pipe Forwarder");
+                       setDaemon(true);
+
+                       this.source = source;
+                       this.target = target;
+
+                       start();
+               }
+
+               @Override
+               public void run() {
+                       try {
+                               int next;
+                               while ((next = source.read()) != -1) {
+                                       target.write(next);
+                               }
+                       }
+                       catch (IOException e) {
+                               // terminate
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1230bcaa/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
new file mode 100644
index 0000000..b92cfd4
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.junit.Test;
+
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive;
+
+/**
+ * Tests that the JobManager process properly exits when the JobManager actor 
dies.
+ */
+public class TaskManagerProcessReapingTest {
+
+       private static final String TASK_MANAGER_ACTOR_NAME = "TEST_TM";
+
+       @Test
+       public void testReapProcessOnFailure() {
+               Process taskManagerProcess = null;
+               ActorSystem jmActorSystem = null;
+
+               final StringWriter processOutput = new StringWriter();
+
+               try {
+                       String javaCommand = getJavaCommandPath();
+
+                       // check that we run this test only if the java command
+                       // is available on this machine
+                       if (javaCommand == null) {
+                               System.out.println("---- Skipping 
TaskManagerProcessReapingTest : Could not find java executable ----");
+                               return;
+                       }
+
+                       // create a logging file for the process
+                       File tempLogFile = File.createTempFile("testlogconfig", 
"properties");
+                       tempLogFile.deleteOnExit();
+                       CommonTestUtils.printLog4jDebugConfig(tempLogFile);
+
+                       final int jobManagerPort = NetUtils.getAvailablePort();
+
+                       // start a JobManager
+                       Tuple2<String, Object> localAddress = new 
Tuple2<String, Object>("localhost", jobManagerPort);
+                       jmActorSystem = AkkaUtils.createActorSystem(
+                                       new Configuration(), new 
Some<Tuple2<String, Object>>(localAddress));
+
+                       JobManager.startJobManagerActors(new Configuration(), 
jmActorSystem);
+
+                       final int taskManagerPort = NetUtils.getAvailablePort();
+
+                       // start the task manager process
+                       String[] command = new String[] {
+                                       javaCommand,
+                                       "-Dlog.level=DEBUG",
+                                       "-Dlog4j.configuration=file:" + 
tempLogFile.getAbsolutePath(),
+                                       "-Xms256m", "-Xmx256m",
+                                       "-classpath", getCurrentClasspath(),
+                                       
TaskManagerTestEntryPoint.class.getName(),
+                                       String.valueOf(jobManagerPort), 
String.valueOf(taskManagerPort)
+                       };
+
+                       ProcessBuilder bld = new ProcessBuilder(command);
+                       taskManagerProcess = bld.start();
+                       new PipeForwarder(taskManagerProcess.getErrorStream(), 
processOutput);
+
+                       // grab the reference to the TaskManager. try multiple 
times, until the process
+                       // is started and the TaskManager is up
+                       String taskManagerActorName = 
String.format("akka.tcp://flink@%s:%d/user/%s",
+                                       "127.0.0.1", taskManagerPort, 
TASK_MANAGER_ACTOR_NAME);
+
+                       ActorRef taskManagerRef = null;
+                       for (int i = 0; i < 20; i++) {
+                               try {
+                                       taskManagerRef = 
TaskManager.getTaskManagerRemoteReference(
+                                                       taskManagerActorName, 
jmActorSystem, new FiniteDuration(5, TimeUnit.SECONDS));
+                                       break;
+                               }
+                               catch (Throwable t) {
+                                       // TaskManager probably not ready yet
+                               }
+                               Thread.sleep(500);
+                       }
+
+                       assertTrue("TaskManager process died", 
isProcessAlive(taskManagerProcess));
+                       assertTrue("TaskManager process did not launch the 
TaskManager properly", taskManagerRef != null);
+
+                       // kill the TaskManager actor
+                       taskManagerRef.tell(PoisonPill.getInstance(), 
ActorRef.noSender());
+
+                       // wait for max 5 seconds for the process to terminate
+                       {
+                               long now = System.currentTimeMillis();
+                               long deadline = now + 5000;
+
+                               while (now < deadline && 
isProcessAlive(taskManagerProcess)) {
+                                       Thread.sleep(100);
+                                       now = System.currentTimeMillis();
+                               }
+                       }
+
+                       assertFalse("TaskManager process did not terminate upon 
actor death", isProcessAlive(taskManagerProcess));
+
+                       int returnCode = taskManagerProcess.exitValue();
+                       assertEquals("TaskManager died, but not because of the 
process reaper",
+                                       
TaskManager.RUNTIME_FAILURE_RETURN_CODE(), returnCode);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       printProcessLog(processOutput.toString());
+                       fail(e.getMessage());
+               }
+               catch (Error e) {
+                       e.printStackTrace();
+                       printProcessLog(processOutput.toString());
+                       throw e;
+               }
+               finally {
+                       if (taskManagerProcess != null) {
+                               taskManagerProcess.destroy();
+                       }
+                       if (jmActorSystem != null) {
+                               jmActorSystem.shutdown();
+                       }
+               }
+       }
+
+       private static void printProcessLog(String log) {
+               System.out.println("-----------------------------------------");
+               System.out.println("       BEGIN SPAWNED PROCESS LOG");
+               System.out.println("-----------------------------------------");
+               System.out.println(log);
+               System.out.println("-----------------------------------------");
+               System.out.println("        END SPAWNED PROCESS LOG");
+               System.out.println("-----------------------------------------");
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       public static class TaskManagerTestEntryPoint {
+
+               public static void main(String[] args) {
+                       try {
+                               int jobManagerPort = Integer.parseInt(args[0]);
+                               int taskManagerPort = Integer.parseInt(args[1]);
+
+                               Configuration cfg = new Configuration();
+                               
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+                               
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
+                               
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+                               
cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
+
+                               TaskManager.startActor("localhost", 
taskManagerPort, cfg, TASK_MANAGER_ACTOR_NAME);
+
+                               // wait forever
+                               Object lock = new Object();
+                               synchronized (lock) {
+                                       lock.wait();
+                               }
+                       }
+                       catch (Throwable t) {
+                               System.exit(1);
+                       }
+               }
+       }
+
+       private static class PipeForwarder extends Thread {
+
+               private final StringWriter target;
+               private final InputStream source;
+
+               public PipeForwarder(InputStream source, StringWriter target) {
+                       super("Pipe Forwarder");
+                       setDaemon(true);
+
+                       this.source = source;
+                       this.target = target;
+
+                       start();
+               }
+
+               @Override
+               public void run() {
+                       try {
+                               int next;
+                               while ((next = source.read()) != -1) {
+                                       target.write(next);
+                               }
+                       }
+                       catch (IOException e) {
+                               // terminate
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1230bcaa/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
index fc52417..ca05416 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
@@ -20,14 +20,18 @@ package org.apache.flink.runtime.testutils;
 
 import static org.junit.Assert.fail;
 
+import java.io.BufferedWriter;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.io.PrintWriter;
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
 
@@ -204,4 +208,24 @@ public class CommonTestUtils {
                        return true;
                }
        }
+
+       public static void printLog4jDebugConfig(File file) throws IOException {
+               FileWriter fw = new FileWriter(file);
+               try {
+                       PrintWriter writer = new PrintWriter(fw);
+
+                       writer.println("log4j.rootLogger=DEBUG, console");
+                       
writer.println("log4j.appender.console=org.apache.log4j.ConsoleAppender");
+                       writer.println("log4j.appender.console.target = 
System.err");
+                       
writer.println("log4j.appender.console.layout=org.apache.log4j.PatternLayout");
+                       
writer.println("log4j.appender.console.layout.ConversionPattern=%-4r [%t] %-5p 
%c %x - %m%n");
+                       
writer.println("log4j.logger.org.eclipse.jetty.util.log=OFF");
+
+                       writer.flush();
+                       writer.close();
+               }
+               finally {
+                       fw.close();
+               }
+       }
 }

Reply via email to