Repository: flink Updated Branches: refs/heads/master d6e9fa046 -> 4e52fe430
[FLINK-3134][yarn] asynchronous YarnJobManager heartbeats - use AMRMClientAsync instead of AMRMClient - handle allocation and startup of containers in callbacks - remove YarnHeartbeat message The AMRMClientAsync uses one thread to communicate with the resource manager and an additional thread to execute the callbacks. This closes #1450. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4e52fe43 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4e52fe43 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4e52fe43 Branch: refs/heads/master Commit: 4e52fe4304566e5239996b3d48290e0c1f0772e8 Parents: d6e9fa0 Author: Maximilian Michels <m...@apache.org> Authored: Mon Dec 7 14:35:04 2015 +0100 Committer: Maximilian Michels <m...@apache.org> Committed: Thu Dec 17 15:19:48 2015 +0100 ---------------------------------------------------------------------- .../flink/yarn/YARNSessionFIFOITCase.java | 4 +- .../org/apache/flink/yarn/YarnTestBase.java | 5 +- .../flink/yarn/TestingYarnJobManager.scala | 1 - .../apache/flink/yarn/ApplicationClient.scala | 2 + .../org/apache/flink/yarn/YarnJobManager.scala | 560 ++++++++++--------- 5 files changed, 301 insertions(+), 271 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4e52fe43/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java index 69925f2..14ee8ec 100644 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java @@ -284,14 +284,14 @@ public class YARNSessionFIFOITCase extends YarnTestBase { } // stateful termination check: - // wait until we saw a container being killed and AFTERWARDS a new one launced + // wait until we saw a container being killed and AFTERWARDS a new one launched boolean ok = false; do { LOG.debug("Waiting for correct order of events. Output: {}", errContent.toString()); String o = errContent.toString(); int killedOff = o.indexOf("Container killed by the ApplicationMaster"); - if(killedOff != -1) { + if (killedOff != -1) { o = o.substring(killedOff); ok = o.indexOf("Launching container") > 0; } http://git-wip-us.apache.org/repos/asf/flink/blob/4e52fe43/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java index 5ce528e..d3132d7 100644 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java @@ -89,7 +89,10 @@ public abstract class YarnTestBase extends TestLogger { /** These strings are white-listed, overriding teh prohibited strings */ protected final static String[] WHITELISTED_STRINGS = { - "akka.remote.RemoteTransportExceptionNoStackTrace" + "akka.remote.RemoteTransportExceptionNoStackTrace", + // workaround for annoying InterruptedException logging: + // https://issues.apache.org/jira/browse/YARN-1022 + "java.lang.InterruptedException" }; // Temp directory which is deleted after the unit test. http://git-wip-us.apache.org/repos/asf/flink/blob/4e52fe43/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala index 3379325..a7f21e5 100644 --- a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala +++ b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala @@ -47,7 +47,6 @@ import scala.concurrent.duration.FiniteDuration * @param defaultExecutionRetries Number of default execution retries * @param delayBetweenRetries Delay between retries * @param timeout Timeout for futures - * @param mode StreamingMode in which the system shall be started * @param leaderElectionService LeaderElectionService to participate in the leader election */ class TestingYarnJobManager( http://git-wip-us.apache.org/repos/asf/flink/blob/4e52fe43/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala index 79717ef..8b2c06f 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala @@ -21,6 +21,8 @@ package org.apache.flink.yarn import java.util.UUID import akka.actor._ +import akka.pattern +import akka.util.Timeout import grizzled.slf4j.Logger import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService} http://git-wip-us.apache.org/repos/asf/flink/blob/4e52fe43/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index 8494f08..735f567 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -48,7 +48,8 @@ import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.client.api.{NMClient, AMRMClient} +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync +import org.apache.hadoop.yarn.client.api.NMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.exceptions.YarnException @@ -108,7 +109,7 @@ class YarnJobManager( val FAST_YARN_HEARTBEAT_DELAY: FiniteDuration = 500 milliseconds val DEFAULT_YARN_HEARTBEAT_DELAY: FiniteDuration = 5 seconds val YARN_HEARTBEAT_DELAY: FiniteDuration = - if(flinkConfiguration.getString(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, null) == null) { + if (flinkConfiguration.getString(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, null) == null) { DEFAULT_YARN_HEARTBEAT_DELAY } else { FiniteDuration( @@ -123,9 +124,9 @@ class YarnJobManager( val detached = java.lang.Boolean.valueOf(env.get(FlinkYarnClientBase.ENV_DETACHED)) var stopWhenJobFinished: JobID = null - var rmClientOption: Option[AMRMClient[ContainerRequest]] = None + var rmClientOption: Option[AMRMClientAsync[ContainerRequest]] = None var nmClientOption: Option[NMClient] = None - var messageListener:Option[ActorRef] = None + var messageListener: Option[ActorRef] = None var containerLaunchContext: Option[ContainerLaunchContext] = None var runningContainers = 0 // number of currently running containers @@ -155,27 +156,28 @@ class YarnJobManager( rmClientOption foreach { rmClient => - Try(rmClient.unregisterApplicationMaster(status, diag, "")).recover{ + Try(rmClient.unregisterApplicationMaster(status, diag, "")).recover { case t: Throwable => log.error("Could not unregister the application master.", t) } - Try(rmClient.close()).recover{ - case t:Throwable => log.error("Could not close the AMRMClient.", t) + Try(rmClient.stop()).recover { + case t: Throwable => log.error("Could not close the AMRMClient.", t) } + } rmClientOption = None nmClientOption foreach { nmClient => - Try(nmClient.close()).recover{ - case t: Throwable => log.error("Could not close the NMClient.", t) - } + Try(nmClient.close()).recover { + case t: Throwable => log.error("Could not close the NMClient.", t) + } } nmClientOption = None messageListener foreach { - _ ! decorateMessage(JobManagerStopped) + _ ! decorateMessage(JobManagerStopped) } context.system.shutdown() @@ -194,6 +196,17 @@ class YarnJobManager( val jobId = msg.jobId log.info(s"ApplicatonMaster will shut down YARN session when job $jobId has finished.") stopWhenJobFinished = jobId + // trigger regular job status messages (if this is a per-job yarn cluster) + if (stopWhenJobFinished != null) { + context.system.scheduler.schedule(0 seconds, + YARN_HEARTBEAT_DELAY, + new Runnable { + override def run(): Unit = { + self ! decorateMessage(RequestJobStatus(stopWhenJobFinished)) + } + } + ) + } sender() ! decorateMessage(Acknowledge) @@ -209,19 +222,19 @@ class YarnJobManager( case jnf: JobNotFound => log.warn(s"Job with ID ${jnf.jobID} not found in JobManager") - if(stopWhenJobFinished == null) { + if (stopWhenJobFinished == null) { log.warn("The ApplicationMaster didn't expect to receive this message") } case jobStatus: CurrentJobStatus => - if(stopWhenJobFinished == null) { + if (stopWhenJobFinished == null) { log.warn(s"Received job status $jobStatus which wasn't requested.") } else { - if(stopWhenJobFinished != jobStatus.jobID) { + if (stopWhenJobFinished != jobStatus.jobID) { log.warn(s"Received job status for job ${jobStatus.jobID} but expected status for " + s"job $stopWhenJobFinished") } else { - if(jobStatus.status.isTerminalState) { + if (jobStatus.status.isTerminalState) { log.info(s"Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}. " + s"Shutting down YARN session") if (jobStatus.status == JobStatus.FINISHED) { @@ -242,238 +255,18 @@ class YarnJobManager( } } } - - case HeartbeatWithYarn => - // piggyback on the YARN heartbeat to check if the job has finished - if(stopWhenJobFinished != null) { - self ! decorateMessage(RequestJobStatus(stopWhenJobFinished)) - } - rmClientOption match { - case Some(rmClient) => - log.debug("Send heartbeat to YARN") - val response = rmClient.allocate(runningContainers.toFloat / numTaskManagers) - - // ---------------------------- handle YARN responses ------------- - - val newlyAllocatedContainers = response.getAllocatedContainers.asScala - - newlyAllocatedContainers.foreach { - container => log.info(s"Got new container for allocation: ${container.getId}") - } - - allocatedContainersList ++= newlyAllocatedContainers - numPendingRequests = math.max(0, numPendingRequests - newlyAllocatedContainers.length) - - val completedContainerStatuses = response.getCompletedContainersStatuses.asScala - val idStatusMap = completedContainerStatuses - .map(status => (status.getContainerId, status)).toMap - - completedContainerStatuses.foreach { - status => log.info(s"Container ${status.getContainerId} is completed " + - s"with diagnostics: ${status.getDiagnostics}") - } - - // get failed containers (returned containers are also completed, so we have to - // distinguish if it was running before). - val (completedContainers, remainingRunningContainers) = runningContainersList - .partition(idStatusMap contains _.getId) - - completedContainers.foreach { - container => - val status = idStatusMap(container.getId) - failedContainers += 1 - runningContainers -= 1 - log.info(s"Container ${status.getContainerId} was a running container. " + - s"Total failed containers $failedContainers.") - val detail = status.getExitStatus match { - case -103 => "Vmem limit exceeded"; - case -104 => "Pmem limit exceeded"; - case _ => "" - } - messageListener foreach { - _ ! decorateMessage( - YarnMessage(s"Diagnostics for containerID=${status.getContainerId} in " + - s"state=${status.getState}.\n${status.getDiagnostics} $detail") - ) - } - } - - runningContainersList = remainingRunningContainers - - // return containers if the RM wants them and we haven't allocated them yet. - val preemptionMessage = response.getPreemptionMessage - if(preemptionMessage != null) { - log.info(s"Received preemtion message from YARN $preemptionMessage.") - val contract = preemptionMessage.getContract - if(contract != null) { - tryToReturnContainers(contract.getContainers.asScala.toSet) - } - val strictContract = preemptionMessage.getStrictContract - if(strictContract != null) { - tryToReturnContainers(strictContract.getContainers.asScala.toSet) - } - } - - // ---------------------------- decide if we need to do anything --------- - - // check if we want to start some of our allocated containers. - if(runningContainers < numTaskManagers) { - val missingContainers = numTaskManagers - runningContainers - log.info(s"The user requested $numTaskManagers containers, $runningContainers " + - s"running. $missingContainers containers missing") - - val numStartedContainers = startTMsInAllocatedContainers(missingContainers) - - // if there are still containers missing, request them from YARN - val toAllocateFromYarn = Math.max( - missingContainers - numStartedContainers - numPendingRequests, - 0) - - if (toAllocateFromYarn > 0) { - val reallocate = flinkConfiguration - .getBoolean(ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS, true) - log.info(s"There are $missingContainers containers missing." + - s" $numPendingRequests are already requested. " + - s"Requesting $toAllocateFromYarn additional container(s) from YARN. " + - s"Reallocation of failed containers is enabled=$reallocate " + - s"('${ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS}')") - // there are still containers missing. Request them from YARN - if (reallocate) { - for(i <- 1 to toAllocateFromYarn) { - val containerRequest = getContainerRequest(memoryPerTaskManager) - rmClient.addContainerRequest(containerRequest) - numPendingRequests += 1 - log.info("Requested additional container from YARN. Pending requests " + - s"$numPendingRequests.") - } - } - } - } - - if(runningContainers >= numTaskManagers && allocatedContainersList.nonEmpty) { - log.info(s"Flink has ${allocatedContainersList.size} allocated containers which " + - s"are not needed right now. Returning them") - for(container <- allocatedContainersList) { - rmClient.releaseAssignedContainer(container.getId) - } - allocatedContainersList = List() - } - - // maxFailedContainers == -1 is infinite number of retries. - if(maxFailedContainers != -1 && failedContainers >= maxFailedContainers) { - val msg = s"Stopping YARN session because the number of failed " + - s"containers ($failedContainers) exceeded the maximum failed container " + - s"count ($maxFailedContainers). This number is controlled by " + - s"the '${ConfigConstants.YARN_MAX_FAILED_CONTAINERS}' configuration " + - s"setting. By default its the number of requested containers" - log.error(msg) - self ! decorateMessage(StopYarnSession(FinalApplicationStatus.FAILED, msg)) - } - - // schedule next heartbeat: - if (runningContainers < numTaskManagers) { - // we don't have the requested number of containers. Do fast polling - context.system.scheduler.scheduleOnce( - FAST_YARN_HEARTBEAT_DELAY, - self, - decorateMessage(HeartbeatWithYarn)) - } else { - // everything is good, slow down polling - context.system.scheduler.scheduleOnce( - YARN_HEARTBEAT_DELAY, - self, - decorateMessage(HeartbeatWithYarn)) - } - case None => - log.error("The AMRMClient was not set.") - self ! decorateMessage( - StopYarnSession( - FinalApplicationStatus.FAILED, - "Fatal error in AM: AMRMClient was not set") - ) - } - log.debug(s"Processed Heartbeat with RMClient. Running containers $runningContainers, " + - s"failed containers $failedContainers, " + - s"allocated containers ${allocatedContainersList.size}.") } - /** Starts min(numTMsToStart, allocatedContainersList.size) TaskManager in the available - * allocated containers. The number of successfully started TaskManagers is returned. - * - * @param numTMsToStart Number of TaskManagers to start if enough allocated containers are - * available. If not, then all allocated containers are used - * @return Number of successfully started TaskManagers - */ - private def startTMsInAllocatedContainers(numTMsToStart: Int): Int = { - // not enough containers running - if (allocatedContainersList.nonEmpty) { - log.info(s"${allocatedContainersList.size} containers already allocated by YARN. " + - "Starting...") - - nmClientOption match { - case Some(nmClient) => - containerLaunchContext match { - case Some(ctx) => - val (containersToBeStarted, remainingContainers) = allocatedContainersList - .splitAt(numTMsToStart) - - val startedContainers = containersToBeStarted.flatMap { - container => - try { - nmClient.startContainer(container, ctx) - val message = s"Launching container (${container.getId} on host " + - s"${container.getNodeId.getHost})." - log.info(message) - - messageListener foreach { - _ ! decorateMessage(YarnMessage(message)) - } - - Some(container) - } catch { - case e: YarnException => - log.error(s"Exception while starting YARN " + - s"container ${container.getId} on " + - s"host ${container.getNodeId.getHost}", e) - None - } - } - - runningContainers += startedContainers.length - runningContainersList :::= startedContainers - - allocatedContainersList = remainingContainers - - startedContainers.length - case None => - log.error("The ContainerLaunchContext was not set.") - self ! decorateMessage( - StopYarnSession( - FinalApplicationStatus.FAILED, - "Fatal error in AM: The ContainerLaunchContext was not set.")) - 0 - } - case None => - log.error("The NMClient was not set.") - self ! decorateMessage( - StopYarnSession( - FinalApplicationStatus.FAILED, - "Fatal error in AM: The NMClient was not set.")) - 0 - } - } else { - 0 - } - } private def runningContainerIds(): List[ContainerId] = { - runningContainersList map { runningCont => runningCont.getId} + runningContainersList map { runningCont => runningCont.getId } } + private def allocatedContainerIds(): List[ContainerId] = { - allocatedContainersList map { runningCont => runningCont.getId} + allocatedContainersList map { runningCont => runningCont.getId } } - /** Starts the Yarn session by connecting to the RessourceManager and the NodeManager. After + /** Starts the Yarn session by connecting to the ResourceManager and the NodeManager. After * a connection has been established, the number of missing containers is requested from Yarn. * * @param conf Hadoop configuration object @@ -495,7 +288,7 @@ class YarnJobManager( YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS), MILLISECONDS) - if(YARN_HEARTBEAT_DELAY.gteq(yarnExpiryInterval)) { + if (YARN_HEARTBEAT_DELAY.gteq(yarnExpiryInterval)) { log.warn(s"The heartbeat interval of the Flink Application master " + s"($YARN_HEARTBEAT_DELAY) is greater than YARN's expiry interval " + s"($yarnExpiryInterval). The application is likely to be killed by YARN.") @@ -516,11 +309,19 @@ class YarnJobManager( val shipListString = env.get(FlinkYarnClientBase.ENV_CLIENT_SHIP_FILES) val yarnClientUsername = env.get(FlinkYarnClientBase.ENV_CLIENT_USERNAME) - val rm = AMRMClient.createAMRMClient[ContainerRequest]() - rm.init(conf) - rm.start() - rmClientOption = Some(rm) + // wrap default client in asynchronous handler thread + val rmClientAsync: AMRMClientAsync[ContainerRequest] = AMRMClientAsync.createAMRMClientAsync( + FAST_YARN_HEARTBEAT_DELAY.toMillis.toInt, + AMRMClientAsyncHandler) + + // inject client into handler to adjust the heartbeat interval and make requests + AMRMClientAsyncHandler.setClient(rmClientAsync) + + rmClientAsync.init(conf) + rmClientAsync.start() + + rmClientOption = Some(rmClientAsync) val nm = NMClient.createNMClient() nm.init(conf) @@ -535,7 +336,7 @@ class YarnJobManager( val actorSystemPort = AkkaUtils.getAddress(system).port.getOrElse(-1) - val response = rm.registerApplicationMaster( + val response = rmClientAsync.registerApplicationMaster( applicationMasterHost, actorSystemPort, url) @@ -555,7 +356,7 @@ class YarnJobManager( log.info(s"Requesting initial TaskManager container $i.") numPendingRequests += 1 // these are initial requests. The reallocation setting doesn't affect this. - rm.addContainerRequest(containerRequest) + rmClientAsync.addContainerRequest(containerRequest) } val flinkJar = Records.newRecord(classOf[LocalResource]) @@ -601,10 +402,6 @@ class YarnJobManager( taskManagerLocalResources) ) - context.system.scheduler.scheduleOnce( - FAST_YARN_HEARTBEAT_DELAY, - self, - decorateMessage(HeartbeatWithYarn)) } recover { case t: Throwable => log.error("Could not start yarn session.", t) @@ -622,24 +419,11 @@ class YarnJobManager( * living containers. * @return Seq of living containers which could be retrieved */ - private def getContainersFromPreviousAttempts( - response: RegisterApplicationMasterResponse) - : Seq[Container] = { - + private def getContainersFromPreviousAttempts(response: RegisterApplicationMasterResponse) + : Seq[Container] = { RegisterApplicationMasterResponseReflector.getContainersFromPreviousAttempts(response) } - private def tryToReturnContainers(returnRequest: Set[PreemptionContainer]): Unit = { - for(requestedBackContainers <- returnRequest) { - allocatedContainersList = allocatedContainersList.dropWhile( container => { - val result = requestedBackContainers.getId.equals(container.getId) - if(result) { - log.info(s"Returning container $container back to ResourceManager.") - } - result - }) - } - } private def getContainerRequest(memoryPerTaskManager: Int): ContainerRequest = { // Priority for worker containers - priorities are intra-application @@ -659,7 +443,7 @@ class YarnJobManager( hasLog4j: Boolean, yarnClientUsername: String, yarnConf: Configuration, - taskManagerLocalResources: Map[String, LocalResource]) : ContainerLaunchContext = { + taskManagerLocalResources: Map[String, LocalResource]): ContainerLaunchContext = { log.info("Create container launch context.") val ctx = Records.newRecord(classOf[ContainerLaunchContext]) @@ -736,7 +520,7 @@ class YarnJobManager( val useOffHeap = flinkConfiguration.getBoolean( ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false) - if (useOffHeap && eagerAllocation){ + if (useOffHeap && eagerAllocation) { val fixedOffHeapSize = flinkConfiguration.getLong( ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L) @@ -754,6 +538,248 @@ class YarnJobManager( memoryLimit } } + + /** + * Heartbeats with the resource manager and handles container updates. + */ + object AMRMClientAsyncHandler extends AMRMClientAsync.CallbackHandler { + + /* + * Asynchronous client to make requests to the RM. + * Must be set via setClient(..) before its service is started. + */ + private var client : AMRMClientAsync[ContainerRequest] = null + + override def onError(e: Throwable): Unit = { + self ! decorateMessage( + StopYarnSession( + FinalApplicationStatus.FAILED, + "Error in communication with Yarn resource manager: " + e.getMessage) + ) + } + + override def getProgress: Float = { + runningContainers.toFloat / numTaskManagers + } + + override def onShutdownRequest(): Unit = { + // We are getting killed anyway + } + + override def onNodesUpdated(updatedNodes: JavaList[NodeReport]): Unit = { + // We are not interested in node updates + } + + override def onContainersCompleted(statuses: JavaList[ContainerStatus]): Unit = { + + val completedContainerStatuses = statuses.asScala + val idStatusMap = completedContainerStatuses + .map(status => (status.getContainerId, status)).toMap + + completedContainerStatuses.foreach { + status => log.info(s"Container ${status.getContainerId} is completed " + + s"with diagnostics: ${status.getDiagnostics}") + } + + // get failed containers (returned containers are also completed, so we have to + // distinguish if it was running before). + val (completedContainers, remainingRunningContainers) = runningContainersList + .partition(idStatusMap contains _.getId) + + completedContainers.foreach { + container => + val status = idStatusMap(container.getId) + failedContainers += 1 + runningContainers -= 1 + log.info(s"Container ${status.getContainerId} was a running container. " + + s"Total failed containers $failedContainers.") + val detail = status.getExitStatus match { + case -103 => "Vmem limit exceeded"; + case -104 => "Pmem limit exceeded"; + case _ => "" + } + messageListener foreach { + _ ! decorateMessage( + YarnMessage(s"Diagnostics for containerID=${status.getContainerId} in " + + s"state=${status.getState}.\n${status.getDiagnostics} $detail") + ) + } + } + + runningContainersList = remainingRunningContainers + + // maxFailedContainers == -1 is infinite number of retries. + if (maxFailedContainers != -1 && failedContainers >= maxFailedContainers) { + val msg = s"Stopping YARN session because the number of failed " + + s"containers ($failedContainers) exceeded the maximum failed container " + + s"count ($maxFailedContainers). This number is controlled by " + + s"the '${ConfigConstants.YARN_MAX_FAILED_CONTAINERS}' configuration " + + s"setting. By default its the number of requested containers" + log.error(msg) + self ! decorateMessage(StopYarnSession(FinalApplicationStatus.FAILED, msg)) + + } + + allocateContainers() + + } + + override def onContainersAllocated(containers: JavaList[Container]): Unit = { + + val newlyAllocatedContainers = containers.asScala + + newlyAllocatedContainers.foreach { + container => log.info(s"Got new container for allocation: ${container.getId}") + } + + allocatedContainersList ++= containers.asScala + numPendingRequests = math.max(0, numPendingRequests - newlyAllocatedContainers.length) + + allocateContainers() + + if (runningContainers >= numTaskManagers && allocatedContainersList.nonEmpty) { + log.info(s"Flink has ${allocatedContainersList.size} allocated containers which " + + s"are not needed right now. Returning them") + for (container <- allocatedContainersList) { + client.releaseAssignedContainer(container.getId) + } + allocatedContainersList = List() + } + } + + /** + * Allocates new containers if necessary. + */ + private def allocateContainers() : Unit = { + + // check if we want to start some of our allocated containers. + if (runningContainers < numTaskManagers) { + val missingContainers = numTaskManagers - runningContainers + log.info(s"The user requested $numTaskManagers containers, $runningContainers " + + s"running. $missingContainers containers missing") + + val numStartedContainers = startTMsInAllocatedContainers(missingContainers) + + // if there are still containers missing, request them from YARN + val toAllocateFromYarn = Math.max( + missingContainers - numStartedContainers - numPendingRequests, + 0) + + if (toAllocateFromYarn > 0) { + val reallocate = flinkConfiguration + .getBoolean(ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS, true) + log.info(s"There are $missingContainers containers missing." + + s" $numPendingRequests are already requested. " + + s"Requesting $toAllocateFromYarn additional container(s) from YARN. " + + s"Reallocation of failed containers is enabled=$reallocate " + + s"('${ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS}')") + // there are still containers missing. Request them from YARN + if (reallocate) { + for (i <- 1 to toAllocateFromYarn) { + val containerRequest = getContainerRequest(memoryPerTaskManager) + client.addContainerRequest(containerRequest) + numPendingRequests += 1 + log.info("Requested additional container from YARN. Pending requests " + + s"$numPendingRequests.") + } + } + } + } + } + + /** Starts min(numTMsToStart, allocatedContainersList.size) TaskManager in the available + * allocated containers. The number of successfully started TaskManagers is returned. + * + * @param numTMsToStart Number of TaskManagers to start if enough allocated containers are + * available. If not, then all allocated containers are used + * @return Number of successfully started TaskManagers + */ + private def startTMsInAllocatedContainers(numTMsToStart: Int): Int = { + // not enough containers running + if (allocatedContainersList.nonEmpty) { + log.info(s"${allocatedContainersList.size} containers already allocated by YARN. " + + "Starting...") + + nmClientOption match { + case Some(nmClient) => + containerLaunchContext match { + case Some(ctx) => + val (containersToBeStarted, remainingContainers) = allocatedContainersList + .splitAt(numTMsToStart) + + val startedContainers = containersToBeStarted.flatMap { + container => + try { + nmClient.startContainer(container, ctx) + val message = s"Launching container (${container.getId} on host " + + s"${container.getNodeId.getHost})." + log.info(message) + + messageListener foreach { + _ ! decorateMessage(YarnMessage(message)) + } + + Some(container) + } catch { + case e: YarnException => + log.error(s"Exception while starting YARN " + + s"container ${container.getId} on " + + s"host ${container.getNodeId.getHost}", e) + None + } + } + + runningContainers += startedContainers.length + runningContainersList :::= startedContainers + + allocatedContainersList = remainingContainers + + if (runningContainers < numTaskManagers) { + setHeartbeatRate(FAST_YARN_HEARTBEAT_DELAY) + } else { + setHeartbeatRate(YARN_HEARTBEAT_DELAY) + } + + startedContainers.length + case None => + log.error("The ContainerLaunchContext was not set.") + self ! decorateMessage( + StopYarnSession( + FinalApplicationStatus.FAILED, + "Fatal error in AM: The ContainerLaunchContext was not set.")) + 0 + } + case None => + log.error("The NMClient was not set.") + self ! decorateMessage( + StopYarnSession( + FinalApplicationStatus.FAILED, + "Fatal error in AM: The NMClient was not set.")) + 0 + } + } else { + 0 + } + } + + /** + * Adjusts the heartbeat interval of the asynchronous client. + * @param interval The interval between the heartbeats. + */ + private def setHeartbeatRate(interval : FiniteDuration): Unit = { + client.setHeartbeatInterval(interval.toMillis.toInt) + } + + /** + * Register the client with the CallbackHandler. Must be called before the client is started. + * @param clientAsync The AMRM client to make requests with. + */ + def setClient(clientAsync: AMRMClientAsync[ContainerRequest]) = { + client = clientAsync + } + + } + } /** Singleton object to reflect the getContainersFromPreviousAttempts method for