Repository: flink
Updated Branches:
  refs/heads/master d6e9fa046 -> 4e52fe430


[FLINK-3134][yarn] asynchronous YarnJobManager heartbeats

- use AMRMClientAsync instead of AMRMClient
- handle allocation and startup of containers in callbacks
- remove YarnHeartbeat message

The AMRMClientAsync uses one thread to communicate with the resource
manager and an additional thread to execute the callbacks.

This closes #1450.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4e52fe43
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4e52fe43
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4e52fe43

Branch: refs/heads/master
Commit: 4e52fe4304566e5239996b3d48290e0c1f0772e8
Parents: d6e9fa0
Author: Maximilian Michels <m...@apache.org>
Authored: Mon Dec 7 14:35:04 2015 +0100
Committer: Maximilian Michels <m...@apache.org>
Committed: Thu Dec 17 15:19:48 2015 +0100

----------------------------------------------------------------------
 .../flink/yarn/YARNSessionFIFOITCase.java       |   4 +-
 .../org/apache/flink/yarn/YarnTestBase.java     |   5 +-
 .../flink/yarn/TestingYarnJobManager.scala      |   1 -
 .../apache/flink/yarn/ApplicationClient.scala   |   2 +
 .../org/apache/flink/yarn/YarnJobManager.scala  | 560 ++++++++++---------
 5 files changed, 301 insertions(+), 271 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4e52fe43/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 69925f2..14ee8ec 100644
--- 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -284,14 +284,14 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
                }
 
                // stateful termination check:
-               // wait until we saw a container being killed and AFTERWARDS a 
new one launced
+               // wait until we saw a container being killed and AFTERWARDS a 
new one launched
                boolean ok = false;
                do {
                        LOG.debug("Waiting for correct order of events. Output: 
{}", errContent.toString());
 
                        String o = errContent.toString();
                        int killedOff = o.indexOf("Container killed by the 
ApplicationMaster");
-                       if(killedOff != -1) {
+                       if (killedOff != -1) {
                                o = o.substring(killedOff);
                                ok = o.indexOf("Launching container") > 0;
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/4e52fe43/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
index 5ce528e..d3132d7 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
@@ -89,7 +89,10 @@ public abstract class YarnTestBase extends TestLogger {
 
        /** These strings are white-listed, overriding teh prohibited strings */
        protected final static String[] WHITELISTED_STRINGS = {
-                       "akka.remote.RemoteTransportExceptionNoStackTrace"
+                       "akka.remote.RemoteTransportExceptionNoStackTrace",
+                       // workaround for annoying InterruptedException logging:
+                   // https://issues.apache.org/jira/browse/YARN-1022
+                       "java.lang.InterruptedException"
        };
 
        // Temp directory which is deleted after the unit test.

http://git-wip-us.apache.org/repos/asf/flink/blob/4e52fe43/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
 
b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index 3379325..a7f21e5 100644
--- 
a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ 
b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -47,7 +47,6 @@ import scala.concurrent.duration.FiniteDuration
   * @param defaultExecutionRetries Number of default execution retries
   * @param delayBetweenRetries Delay between retries
   * @param timeout Timeout for futures
-  * @param mode StreamingMode in which the system shall be started
   * @param leaderElectionService LeaderElectionService to participate in the 
leader election
   */
 class TestingYarnJobManager(

http://git-wip-us.apache.org/repos/asf/flink/blob/4e52fe43/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
index 79717ef..8b2c06f 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
@@ -21,6 +21,8 @@ package org.apache.flink.yarn
 import java.util.UUID
 
 import akka.actor._
+import akka.pattern
+import akka.util.Timeout
 import grizzled.slf4j.Logger
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, 
LeaderRetrievalService}

http://git-wip-us.apache.org/repos/asf/flink/blob/4e52fe43/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 8494f08..735f567 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -48,7 +48,8 @@ import org.apache.hadoop.yarn.api.ApplicationConstants
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse
 import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.{NMClient, AMRMClient}
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
+import org.apache.hadoop.yarn.client.api.NMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.exceptions.YarnException
@@ -108,7 +109,7 @@ class YarnJobManager(
   val FAST_YARN_HEARTBEAT_DELAY: FiniteDuration = 500 milliseconds
   val DEFAULT_YARN_HEARTBEAT_DELAY: FiniteDuration = 5 seconds
   val YARN_HEARTBEAT_DELAY: FiniteDuration =
-    
if(flinkConfiguration.getString(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, 
null) == null) {
+    if 
(flinkConfiguration.getString(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, 
null) == null) {
       DEFAULT_YARN_HEARTBEAT_DELAY
     } else {
       FiniteDuration(
@@ -123,9 +124,9 @@ class YarnJobManager(
   val detached = 
java.lang.Boolean.valueOf(env.get(FlinkYarnClientBase.ENV_DETACHED))
   var stopWhenJobFinished: JobID = null
 
-  var rmClientOption: Option[AMRMClient[ContainerRequest]] = None
+  var rmClientOption: Option[AMRMClientAsync[ContainerRequest]] = None
   var nmClientOption: Option[NMClient] = None
-  var messageListener:Option[ActorRef] = None
+  var messageListener: Option[ActorRef] = None
   var containerLaunchContext: Option[ContainerLaunchContext] = None
 
   var runningContainers = 0 // number of currently running containers
@@ -155,27 +156,28 @@ class YarnJobManager(
 
       rmClientOption foreach {
         rmClient =>
-          Try(rmClient.unregisterApplicationMaster(status, diag, "")).recover{
+          Try(rmClient.unregisterApplicationMaster(status, diag, "")).recover {
             case t: Throwable => log.error("Could not unregister the 
application master.", t)
           }
 
-          Try(rmClient.close()).recover{
-            case t:Throwable => log.error("Could not close the AMRMClient.", t)
+          Try(rmClient.stop()).recover {
+            case t: Throwable => log.error("Could not close the AMRMClient.", 
t)
           }
+
       }
 
       rmClientOption = None
 
       nmClientOption foreach {
         nmClient =>
-        Try(nmClient.close()).recover{
-          case t: Throwable => log.error("Could not close the NMClient.", t)
-        }
+          Try(nmClient.close()).recover {
+            case t: Throwable => log.error("Could not close the NMClient.", t)
+          }
       }
 
       nmClientOption = None
       messageListener foreach {
-          _ ! decorateMessage(JobManagerStopped)
+        _ ! decorateMessage(JobManagerStopped)
       }
 
       context.system.shutdown()
@@ -194,6 +196,17 @@ class YarnJobManager(
       val jobId = msg.jobId
       log.info(s"ApplicatonMaster will shut down YARN session when job $jobId 
has finished.")
       stopWhenJobFinished = jobId
+      // trigger regular job status messages (if this is a per-job yarn 
cluster)
+      if (stopWhenJobFinished != null) {
+        context.system.scheduler.schedule(0 seconds,
+          YARN_HEARTBEAT_DELAY,
+          new Runnable {
+            override def run(): Unit = {
+              self ! decorateMessage(RequestJobStatus(stopWhenJobFinished))
+            }
+          }
+        )
+      }
       sender() ! decorateMessage(Acknowledge)
 
 
@@ -209,19 +222,19 @@ class YarnJobManager(
 
     case jnf: JobNotFound =>
       log.warn(s"Job with ID ${jnf.jobID} not found in JobManager")
-      if(stopWhenJobFinished == null) {
+      if (stopWhenJobFinished == null) {
         log.warn("The ApplicationMaster didn't expect to receive this message")
       }
 
     case jobStatus: CurrentJobStatus =>
-      if(stopWhenJobFinished == null) {
+      if (stopWhenJobFinished == null) {
         log.warn(s"Received job status $jobStatus which wasn't requested.")
       } else {
-        if(stopWhenJobFinished != jobStatus.jobID) {
+        if (stopWhenJobFinished != jobStatus.jobID) {
           log.warn(s"Received job status for job ${jobStatus.jobID} but 
expected status for " +
             s"job $stopWhenJobFinished")
         } else {
-          if(jobStatus.status.isTerminalState) {
+          if (jobStatus.status.isTerminalState) {
             log.info(s"Job with ID ${jobStatus.jobID} is in terminal state 
${jobStatus.status}. " +
               s"Shutting down YARN session")
             if (jobStatus.status == JobStatus.FINISHED) {
@@ -242,238 +255,18 @@ class YarnJobManager(
           }
         }
       }
-
-    case HeartbeatWithYarn =>
-      // piggyback on the YARN heartbeat to check if the job has finished
-      if(stopWhenJobFinished != null) {
-        self ! decorateMessage(RequestJobStatus(stopWhenJobFinished))
-      }
-      rmClientOption match {
-        case Some(rmClient) =>
-          log.debug("Send heartbeat to YARN")
-          val response = rmClient.allocate(runningContainers.toFloat / 
numTaskManagers)
-
-          // ---------------------------- handle YARN responses -------------
-
-          val newlyAllocatedContainers = 
response.getAllocatedContainers.asScala
-
-          newlyAllocatedContainers.foreach {
-            container => log.info(s"Got new container for allocation: 
${container.getId}")
-          }
-
-          allocatedContainersList ++= newlyAllocatedContainers
-          numPendingRequests = math.max(0, numPendingRequests - 
newlyAllocatedContainers.length)
-
-          val completedContainerStatuses = 
response.getCompletedContainersStatuses.asScala
-          val idStatusMap = completedContainerStatuses
-            .map(status => (status.getContainerId, status)).toMap
-
-          completedContainerStatuses.foreach {
-            status => log.info(s"Container ${status.getContainerId} is 
completed " +
-              s"with diagnostics: ${status.getDiagnostics}")
-          }
-
-          // get failed containers (returned containers are also completed, so 
we have to
-          // distinguish if it was running before).
-          val (completedContainers, remainingRunningContainers) = 
runningContainersList
-            .partition(idStatusMap contains _.getId)
-
-          completedContainers.foreach {
-            container =>
-              val status = idStatusMap(container.getId)
-              failedContainers += 1
-              runningContainers -= 1
-              log.info(s"Container ${status.getContainerId} was a running 
container. " +
-                s"Total failed containers $failedContainers.")
-              val detail = status.getExitStatus match {
-                case -103 => "Vmem limit exceeded";
-                case -104 => "Pmem limit exceeded";
-                case _ => ""
-              }
-              messageListener foreach {
-                _ ! decorateMessage(
-                  YarnMessage(s"Diagnostics for 
containerID=${status.getContainerId} in " +
-                    s"state=${status.getState}.\n${status.getDiagnostics} 
$detail")
-                )
-              }
-          }
-
-          runningContainersList = remainingRunningContainers
-
-          // return containers if the RM wants them and we haven't allocated 
them yet.
-          val preemptionMessage = response.getPreemptionMessage
-          if(preemptionMessage != null) {
-            log.info(s"Received preemtion message from YARN 
$preemptionMessage.")
-            val contract = preemptionMessage.getContract
-            if(contract != null) {
-              tryToReturnContainers(contract.getContainers.asScala.toSet)
-            }
-            val strictContract = preemptionMessage.getStrictContract
-            if(strictContract != null) {
-              tryToReturnContainers(strictContract.getContainers.asScala.toSet)
-            }
-          }
-
-          // ---------------------------- decide if we need to do anything 
---------
-
-          // check if we want to start some of our allocated containers.
-          if(runningContainers < numTaskManagers) {
-            val missingContainers = numTaskManagers - runningContainers
-            log.info(s"The user requested $numTaskManagers containers, 
$runningContainers " +
-              s"running. $missingContainers containers missing")
-
-            val numStartedContainers = 
startTMsInAllocatedContainers(missingContainers)
-
-            // if there are still containers missing, request them from YARN
-            val toAllocateFromYarn = Math.max(
-              missingContainers - numStartedContainers - numPendingRequests,
-              0)
-
-            if (toAllocateFromYarn > 0) {
-              val reallocate = flinkConfiguration
-                .getBoolean(ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS, 
true)
-              log.info(s"There are $missingContainers containers missing." +
-                s" $numPendingRequests are already requested. " +
-                s"Requesting $toAllocateFromYarn additional container(s) from 
YARN. " +
-                s"Reallocation of failed containers is enabled=$reallocate " +
-                s"('${ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS}')")
-              // there are still containers missing. Request them from YARN
-              if (reallocate) {
-                for(i <- 1 to toAllocateFromYarn) {
-                  val containerRequest = 
getContainerRequest(memoryPerTaskManager)
-                  rmClient.addContainerRequest(containerRequest)
-                  numPendingRequests += 1
-                  log.info("Requested additional container from YARN. Pending 
requests " +
-                    s"$numPendingRequests.")
-                }
-              }
-            }
-          }
-
-          if(runningContainers >= numTaskManagers && 
allocatedContainersList.nonEmpty) {
-            log.info(s"Flink has ${allocatedContainersList.size} allocated 
containers which " +
-              s"are not needed right now. Returning them")
-            for(container <- allocatedContainersList) {
-              rmClient.releaseAssignedContainer(container.getId)
-            }
-            allocatedContainersList = List()
-          }
-
-          // maxFailedContainers == -1 is infinite number of retries.
-          if(maxFailedContainers != -1 && failedContainers >= 
maxFailedContainers) {
-            val msg = s"Stopping YARN session because the number of failed " +
-              s"containers ($failedContainers) exceeded the maximum failed 
container " +
-              s"count ($maxFailedContainers). This number is controlled by " +
-              s"the '${ConfigConstants.YARN_MAX_FAILED_CONTAINERS}' 
configuration " +
-              s"setting. By default its the number of requested containers"
-            log.error(msg)
-            self ! 
decorateMessage(StopYarnSession(FinalApplicationStatus.FAILED, msg))
-          }
-
-          // schedule next heartbeat:
-          if (runningContainers < numTaskManagers) {
-            // we don't have the requested number of containers. Do fast 
polling
-            context.system.scheduler.scheduleOnce(
-              FAST_YARN_HEARTBEAT_DELAY,
-              self,
-              decorateMessage(HeartbeatWithYarn))
-          } else {
-            // everything is good, slow down polling
-            context.system.scheduler.scheduleOnce(
-              YARN_HEARTBEAT_DELAY,
-              self,
-              decorateMessage(HeartbeatWithYarn))
-          }
-        case None =>
-          log.error("The AMRMClient was not set.")
-          self ! decorateMessage(
-            StopYarnSession(
-              FinalApplicationStatus.FAILED,
-              "Fatal error in AM: AMRMClient was not set")
-          )
-      }
-      log.debug(s"Processed Heartbeat with RMClient. Running containers 
$runningContainers, " +
-        s"failed containers $failedContainers, " +
-        s"allocated containers ${allocatedContainersList.size}.")
   }
 
-  /** Starts min(numTMsToStart, allocatedContainersList.size) TaskManager in 
the available
-    * allocated containers. The number of successfully started TaskManagers is 
returned.
-    *
-    * @param numTMsToStart Number of TaskManagers to start if enough allocated 
containers are
-    *                      available. If not, then all allocated containers 
are used
-    * @return Number of successfully started TaskManagers
-    */
-  private def startTMsInAllocatedContainers(numTMsToStart: Int): Int = {
-    // not enough containers running
-    if (allocatedContainersList.nonEmpty) {
-      log.info(s"${allocatedContainersList.size} containers already allocated 
by YARN. " +
-        "Starting...")
-
-      nmClientOption match {
-        case Some(nmClient) =>
-          containerLaunchContext match {
-            case Some(ctx) =>
-              val (containersToBeStarted, remainingContainers) = 
allocatedContainersList
-                .splitAt(numTMsToStart)
-
-              val startedContainers = containersToBeStarted.flatMap {
-                container =>
-                  try {
-                    nmClient.startContainer(container, ctx)
-                    val message = s"Launching container (${container.getId} on 
host " +
-                      s"${container.getNodeId.getHost})."
-                    log.info(message)
-
-                    messageListener foreach {
-                      _ ! decorateMessage(YarnMessage(message))
-                    }
-
-                    Some(container)
-                  } catch {
-                    case e: YarnException =>
-                      log.error(s"Exception while starting YARN " +
-                        s"container ${container.getId} on " +
-                        s"host ${container.getNodeId.getHost}", e)
-                      None
-                  }
-              }
-
-              runningContainers += startedContainers.length
-              runningContainersList :::= startedContainers
-
-              allocatedContainersList = remainingContainers
-
-              startedContainers.length
-            case None =>
-              log.error("The ContainerLaunchContext was not set.")
-              self ! decorateMessage(
-                StopYarnSession(
-                  FinalApplicationStatus.FAILED,
-                  "Fatal error in AM: The ContainerLaunchContext was not 
set."))
-              0
-          }
-        case None =>
-          log.error("The NMClient was not set.")
-          self ! decorateMessage(
-            StopYarnSession(
-              FinalApplicationStatus.FAILED,
-              "Fatal error in AM: The NMClient was not set."))
-          0
-      }
-    } else {
-      0
-    }
-  }
 
   private def runningContainerIds(): List[ContainerId] = {
-    runningContainersList map { runningCont => runningCont.getId}
+    runningContainersList map { runningCont => runningCont.getId }
   }
+
   private def allocatedContainerIds(): List[ContainerId] = {
-    allocatedContainersList map { runningCont => runningCont.getId}
+    allocatedContainersList map { runningCont => runningCont.getId }
   }
 
-  /** Starts the Yarn session by connecting to the RessourceManager and the 
NodeManager. After
+  /** Starts the Yarn session by connecting to the ResourceManager and the 
NodeManager. After
     * a connection has been established, the number of missing containers is 
requested from Yarn.
     *
     * @param conf Hadoop configuration object
@@ -495,7 +288,7 @@ class YarnJobManager(
           YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS),
         MILLISECONDS)
 
-      if(YARN_HEARTBEAT_DELAY.gteq(yarnExpiryInterval)) {
+      if (YARN_HEARTBEAT_DELAY.gteq(yarnExpiryInterval)) {
         log.warn(s"The heartbeat interval of the Flink Application master " +
           s"($YARN_HEARTBEAT_DELAY) is greater than YARN's expiry interval " +
           s"($yarnExpiryInterval). The application is likely to be killed by 
YARN.")
@@ -516,11 +309,19 @@ class YarnJobManager(
       val shipListString = env.get(FlinkYarnClientBase.ENV_CLIENT_SHIP_FILES)
       val yarnClientUsername = env.get(FlinkYarnClientBase.ENV_CLIENT_USERNAME)
 
-      val rm = AMRMClient.createAMRMClient[ContainerRequest]()
-      rm.init(conf)
-      rm.start()
 
-      rmClientOption = Some(rm)
+      // wrap default client in asynchronous handler thread
+      val rmClientAsync: AMRMClientAsync[ContainerRequest] = 
AMRMClientAsync.createAMRMClientAsync(
+        FAST_YARN_HEARTBEAT_DELAY.toMillis.toInt,
+        AMRMClientAsyncHandler)
+
+      // inject client into handler to adjust the heartbeat interval and make 
requests
+      AMRMClientAsyncHandler.setClient(rmClientAsync)
+
+      rmClientAsync.init(conf)
+      rmClientAsync.start()
+
+      rmClientOption = Some(rmClientAsync)
 
       val nm = NMClient.createNMClient()
       nm.init(conf)
@@ -535,7 +336,7 @@ class YarnJobManager(
 
       val actorSystemPort = AkkaUtils.getAddress(system).port.getOrElse(-1)
 
-      val response = rm.registerApplicationMaster(
+      val response = rmClientAsync.registerApplicationMaster(
         applicationMasterHost,
         actorSystemPort,
         url)
@@ -555,7 +356,7 @@ class YarnJobManager(
           log.info(s"Requesting initial TaskManager container $i.")
           numPendingRequests += 1
           // these are initial requests. The reallocation setting doesn't 
affect this.
-          rm.addContainerRequest(containerRequest)
+          rmClientAsync.addContainerRequest(containerRequest)
       }
 
       val flinkJar = Records.newRecord(classOf[LocalResource])
@@ -601,10 +402,6 @@ class YarnJobManager(
           taskManagerLocalResources)
       )
 
-      context.system.scheduler.scheduleOnce(
-        FAST_YARN_HEARTBEAT_DELAY,
-        self,
-        decorateMessage(HeartbeatWithYarn))
     } recover {
       case t: Throwable =>
         log.error("Could not start yarn session.", t)
@@ -622,24 +419,11 @@ class YarnJobManager(
     *                 living containers.
     * @return Seq of living containers which could be retrieved
     */
-  private def getContainersFromPreviousAttempts(
-      response: RegisterApplicationMasterResponse)
-    : Seq[Container] = {
-
+  private def getContainersFromPreviousAttempts(response: 
RegisterApplicationMasterResponse)
+      : Seq[Container] = {
     
RegisterApplicationMasterResponseReflector.getContainersFromPreviousAttempts(response)
   }
 
-  private def tryToReturnContainers(returnRequest: Set[PreemptionContainer]): 
Unit = {
-    for(requestedBackContainers <- returnRequest) {
-      allocatedContainersList = allocatedContainersList.dropWhile( container 
=> {
-        val result = requestedBackContainers.getId.equals(container.getId)
-        if(result) {
-          log.info(s"Returning container $container back to ResourceManager.")
-        }
-        result
-      })
-    }
-  }
 
   private def getContainerRequest(memoryPerTaskManager: Int): ContainerRequest 
= {
     // Priority for worker containers - priorities are intra-application
@@ -659,7 +443,7 @@ class YarnJobManager(
       hasLog4j: Boolean,
       yarnClientUsername: String,
       yarnConf: Configuration,
-      taskManagerLocalResources: Map[String, LocalResource]) : 
ContainerLaunchContext = {
+      taskManagerLocalResources: Map[String, LocalResource]): 
ContainerLaunchContext = {
     log.info("Create container launch context.")
     val ctx = Records.newRecord(classOf[ContainerLaunchContext])
 
@@ -736,7 +520,7 @@ class YarnJobManager(
     val useOffHeap = flinkConfiguration.getBoolean(
       ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)
 
-    if (useOffHeap && eagerAllocation){
+    if (useOffHeap && eagerAllocation) {
       val fixedOffHeapSize = flinkConfiguration.getLong(
         ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L)
 
@@ -754,6 +538,248 @@ class YarnJobManager(
       memoryLimit
     }
   }
+
+  /**
+    * Heartbeats with the resource manager and handles container updates.
+    */
+  object AMRMClientAsyncHandler extends AMRMClientAsync.CallbackHandler {
+
+    /*
+     * Asynchronous client to make requests to the RM.
+     * Must be set via setClient(..) before its service is started.
+     */
+    private var client : AMRMClientAsync[ContainerRequest] = null
+
+    override def onError(e: Throwable): Unit = {
+      self ! decorateMessage(
+        StopYarnSession(
+          FinalApplicationStatus.FAILED,
+          "Error in communication with Yarn resource manager: " + e.getMessage)
+      )
+    }
+
+    override def getProgress: Float = {
+      runningContainers.toFloat / numTaskManagers
+    }
+
+    override def onShutdownRequest(): Unit = {
+      // We are getting killed anyway
+    }
+
+    override def onNodesUpdated(updatedNodes: JavaList[NodeReport]): Unit = {
+      // We are not interested in node updates
+    }
+
+    override def onContainersCompleted(statuses: JavaList[ContainerStatus]): 
Unit = {
+
+      val completedContainerStatuses = statuses.asScala
+      val idStatusMap = completedContainerStatuses
+        .map(status => (status.getContainerId, status)).toMap
+
+      completedContainerStatuses.foreach {
+        status => log.info(s"Container ${status.getContainerId} is completed " 
+
+          s"with diagnostics: ${status.getDiagnostics}")
+      }
+
+      // get failed containers (returned containers are also completed, so we 
have to
+      // distinguish if it was running before).
+      val (completedContainers, remainingRunningContainers) = 
runningContainersList
+        .partition(idStatusMap contains _.getId)
+
+      completedContainers.foreach {
+        container =>
+          val status = idStatusMap(container.getId)
+          failedContainers += 1
+          runningContainers -= 1
+          log.info(s"Container ${status.getContainerId} was a running 
container. " +
+            s"Total failed containers $failedContainers.")
+          val detail = status.getExitStatus match {
+            case -103 => "Vmem limit exceeded";
+            case -104 => "Pmem limit exceeded";
+            case _ => ""
+          }
+          messageListener foreach {
+            _ ! decorateMessage(
+              YarnMessage(s"Diagnostics for 
containerID=${status.getContainerId} in " +
+                s"state=${status.getState}.\n${status.getDiagnostics} $detail")
+            )
+          }
+      }
+
+      runningContainersList = remainingRunningContainers
+
+      // maxFailedContainers == -1 is infinite number of retries.
+      if (maxFailedContainers != -1 && failedContainers >= 
maxFailedContainers) {
+        val msg = s"Stopping YARN session because the number of failed " +
+          s"containers ($failedContainers) exceeded the maximum failed 
container " +
+          s"count ($maxFailedContainers). This number is controlled by " +
+          s"the '${ConfigConstants.YARN_MAX_FAILED_CONTAINERS}' configuration 
" +
+          s"setting. By default its the number of requested containers"
+        log.error(msg)
+        self ! decorateMessage(StopYarnSession(FinalApplicationStatus.FAILED, 
msg))
+
+      }
+
+      allocateContainers()
+
+    }
+
+    override def onContainersAllocated(containers: JavaList[Container]): Unit 
= {
+
+      val newlyAllocatedContainers = containers.asScala
+
+      newlyAllocatedContainers.foreach {
+        container => log.info(s"Got new container for allocation: 
${container.getId}")
+      }
+
+      allocatedContainersList ++= containers.asScala
+      numPendingRequests = math.max(0, numPendingRequests - 
newlyAllocatedContainers.length)
+
+      allocateContainers()
+
+      if (runningContainers >= numTaskManagers && 
allocatedContainersList.nonEmpty) {
+        log.info(s"Flink has ${allocatedContainersList.size} allocated 
containers which " +
+          s"are not needed right now. Returning them")
+        for (container <- allocatedContainersList) {
+          client.releaseAssignedContainer(container.getId)
+        }
+        allocatedContainersList = List()
+      }
+    }
+
+    /**
+      * Allocates new containers if necessary.
+      */
+    private def allocateContainers() : Unit = {
+
+      // check if we want to start some of our allocated containers.
+      if (runningContainers < numTaskManagers) {
+        val missingContainers = numTaskManagers - runningContainers
+        log.info(s"The user requested $numTaskManagers containers, 
$runningContainers " +
+          s"running. $missingContainers containers missing")
+
+        val numStartedContainers = 
startTMsInAllocatedContainers(missingContainers)
+
+        // if there are still containers missing, request them from YARN
+        val toAllocateFromYarn = Math.max(
+          missingContainers - numStartedContainers - numPendingRequests,
+          0)
+
+        if (toAllocateFromYarn > 0) {
+          val reallocate = flinkConfiguration
+            .getBoolean(ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS, 
true)
+          log.info(s"There are $missingContainers containers missing." +
+            s" $numPendingRequests are already requested. " +
+            s"Requesting $toAllocateFromYarn additional container(s) from 
YARN. " +
+            s"Reallocation of failed containers is enabled=$reallocate " +
+            s"('${ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS}')")
+          // there are still containers missing. Request them from YARN
+          if (reallocate) {
+            for (i <- 1 to toAllocateFromYarn) {
+              val containerRequest = getContainerRequest(memoryPerTaskManager)
+              client.addContainerRequest(containerRequest)
+              numPendingRequests += 1
+              log.info("Requested additional container from YARN. Pending 
requests " +
+                s"$numPendingRequests.")
+            }
+          }
+        }
+      }
+    }
+
+    /** Starts min(numTMsToStart, allocatedContainersList.size) TaskManager in 
the available
+      * allocated containers. The number of successfully started TaskManagers 
is returned.
+      *
+      * @param numTMsToStart Number of TaskManagers to start if enough 
allocated containers are
+      *                      available. If not, then all allocated containers 
are used
+      * @return Number of successfully started TaskManagers
+      */
+    private def startTMsInAllocatedContainers(numTMsToStart: Int): Int = {
+      // not enough containers running
+      if (allocatedContainersList.nonEmpty) {
+        log.info(s"${allocatedContainersList.size} containers already 
allocated by YARN. " +
+          "Starting...")
+
+        nmClientOption match {
+          case Some(nmClient) =>
+            containerLaunchContext match {
+              case Some(ctx) =>
+                val (containersToBeStarted, remainingContainers) = 
allocatedContainersList
+                  .splitAt(numTMsToStart)
+
+                val startedContainers = containersToBeStarted.flatMap {
+                  container =>
+                    try {
+                      nmClient.startContainer(container, ctx)
+                      val message = s"Launching container (${container.getId} 
on host " +
+                        s"${container.getNodeId.getHost})."
+                      log.info(message)
+
+                      messageListener foreach {
+                        _ ! decorateMessage(YarnMessage(message))
+                      }
+
+                      Some(container)
+                    } catch {
+                      case e: YarnException =>
+                        log.error(s"Exception while starting YARN " +
+                          s"container ${container.getId} on " +
+                          s"host ${container.getNodeId.getHost}", e)
+                        None
+                    }
+                }
+
+                runningContainers += startedContainers.length
+                runningContainersList :::= startedContainers
+
+                allocatedContainersList = remainingContainers
+
+                if (runningContainers < numTaskManagers) {
+                  setHeartbeatRate(FAST_YARN_HEARTBEAT_DELAY)
+                } else {
+                  setHeartbeatRate(YARN_HEARTBEAT_DELAY)
+                }
+
+                startedContainers.length
+              case None =>
+                log.error("The ContainerLaunchContext was not set.")
+                self ! decorateMessage(
+                  StopYarnSession(
+                    FinalApplicationStatus.FAILED,
+                    "Fatal error in AM: The ContainerLaunchContext was not 
set."))
+                0
+            }
+          case None =>
+            log.error("The NMClient was not set.")
+            self ! decorateMessage(
+              StopYarnSession(
+                FinalApplicationStatus.FAILED,
+                "Fatal error in AM: The NMClient was not set."))
+            0
+        }
+      } else {
+        0
+      }
+    }
+
+    /**
+      * Adjusts the heartbeat interval of the asynchronous client.
+      * @param interval The interval between the heartbeats.
+      */
+    private def setHeartbeatRate(interval : FiniteDuration): Unit = {
+      client.setHeartbeatInterval(interval.toMillis.toInt)
+    }
+
+    /**
+      * Register the client with the CallbackHandler. Must be called before 
the client is started.
+      * @param clientAsync The AMRM client to make requests with.
+      */
+    def setClient(clientAsync: AMRMClientAsync[ContainerRequest]) = {
+      client = clientAsync
+    }
+
+  }
+
 }
 
 /** Singleton object to reflect the getContainersFromPreviousAttempts method 
for

Reply via email to