Repository: incubator-samza Updated Branches: refs/heads/master bd2fb6776 -> 4e5573b17
SAMZA-58; Use YARN's AMRMClientAsync client library Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/4e5573b1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/4e5573b1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/4e5573b1 Branch: refs/heads/master Commit: 4e5573b172a76e8d1413efb4152f2976ea54304f Parents: bd2fb67 Author: Zhijie Shen <[email protected]> Authored: Mon Apr 7 08:34:32 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Mon Apr 7 08:34:32 2014 -0700 ---------------------------------------------------------------------- build.gradle | 1 + .../org/apache/samza/config/YarnConfig.scala | 6 + .../apache/samza/job/yarn/SamzaAppMaster.scala | 95 +++++++++--- .../job/yarn/SamzaAppMasterLifecycle.scala | 16 +- .../samza/job/yarn/SamzaAppMasterMetrics.scala | 5 - .../job/yarn/SamzaAppMasterTaskManager.scala | 59 ++++---- .../apache/samza/job/yarn/YarnAppMaster.scala | 85 ----------- .../samza/job/yarn/YarnAppMasterListener.scala | 5 - .../samza/job/yarn/TestSamzaAppMaster.scala | 150 +++++++++++++++++++ .../job/yarn/TestSamzaAppMasterLifecycle.scala | 50 +++---- .../yarn/TestSamzaAppMasterTaskManager.scala | 106 ++++++------- .../samza/job/yarn/TestYarnAppMaster.scala | 142 ------------------ 12 files changed, 347 insertions(+), 373 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 75b6c98..b54cee6 100644 --- a/build.gradle +++ b/build.gradle @@ -151,6 +151,7 @@ project(":samza-yarn_$scalaVersion") { exclude module: 'slf4j-api' } testCompile "junit:junit:$junitVersion" + testCompile "org.mockito:mockito-all:$mockitoVersion" } repositories { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala index 6c3aa92..5756d3a 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala @@ -30,6 +30,7 @@ object YarnConfig { val AM_JVM_OPTIONS = "yarn.am.opts" val AM_JMX_ENABLED = "yarn.am.jmx.enabled" val AM_CONTAINER_MAX_MEMORY_MB = "yarn.am.container.memory.mb" + val AM_POLL_INTERVAL_MS = "yarn.am.poll.interval.ms" implicit def Config2Yarn(config: Config) = new YarnConfig(config) } @@ -69,5 +70,10 @@ class YarnConfig(config: Config) extends ScalaMapConfig(config) { case _ => None } + def getAMPollIntervalMs: Option[Int] = getOption(YarnConfig.AM_POLL_INTERVAL_MS) match { + case Some(interval) => Some(interval.toInt) + case _ => None + } + def getJmxServerEnabled = getBoolean(YarnConfig.AM_JMX_ENABLED, true) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala index de6887d..c28c9a6 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala @@ -18,21 +18,27 @@ */ package org.apache.samza.job.yarn -import org.apache.samza.config.MapConfig -import org.apache.samza.config.YarnConfig -import org.apache.samza.config.serializers.JsonConfigSerializer + +import scala.collection.JavaConversions.asScalaBuffer + import org.apache.hadoop.yarn.api.ApplicationConstants +import org.apache.hadoop.yarn.api.records.{ Container, ContainerStatus, NodeReport } +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.ConverterUtils -import scala.collection.JavaConversions._ -import org.apache.samza.metrics.{ JmxServer, MetricsRegistryMap } -import grizzled.slf4j.Logging -import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl -import org.apache.samza.config.YarnConfig._ -import org.apache.samza.job.yarn.SamzaAppMasterTaskManager._ -import org.apache.samza.util.hadoop.HttpFileSystem -import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import org.apache.samza.config.MapConfig import org.apache.samza.config.ShellCommandConfig +import org.apache.samza.config.YarnConfig +import org.apache.samza.config.YarnConfig.Config2Yarn +import org.apache.samza.config.serializers.JsonConfigSerializer +import org.apache.samza.job.yarn.SamzaAppMasterTaskManager.DEFAULT_CONTAINER_MEM +import org.apache.samza.job.yarn.SamzaAppMasterTaskManager.DEFAULT_CPU_CORES +import org.apache.samza.metrics.JmxServer +import org.apache.samza.metrics.MetricsRegistryMap +import org.apache.samza.util.hadoop.HttpFileSystem + +import grizzled.slf4j.Logging /** * When YARN executes an application master, it needs a bash command to @@ -45,7 +51,12 @@ import org.apache.samza.config.ShellCommandConfig * YARN client, and YARN itself), and wires up everything to run Samza's * application master. */ -object SamzaAppMaster extends Logging { +object SamzaAppMaster extends Logging with AMRMClientAsync.CallbackHandler { + val DEFAULT_POLL_INTERVAL_MS: Int = 1000 + var state: SamzaAppMasterState = null + var listeners: List[YarnAppMasterListener] = null + var storedException: Throwable = null + def main(args: Array[String]) { val containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString) info("got container id: %s" format containerIdStr) @@ -62,7 +73,8 @@ object SamzaAppMaster extends Logging { info("got config: %s" format config) val hConfig = new YarnConfiguration hConfig.set("fs.http.impl", classOf[HttpFileSystem].getName) - val amClient = new AMRMClientImpl[ContainerRequest] + val interval = config.getAMPollIntervalMs.getOrElse(DEFAULT_POLL_INTERVAL_MS) + val amClient = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](interval, this) val clientHelper = new ClientHelper(hConfig) val registry = new MetricsRegistryMap val containerMem = config.getContainerMaxMemoryMb.getOrElse(DEFAULT_CONTAINER_MEM) @@ -71,14 +83,13 @@ object SamzaAppMaster extends Logging { try { // wire up all of the yarn event listeners - val state = new SamzaAppMasterState(-1, containerId, nodeHostString, nodePortString.toInt, nodeHttpPortString.toInt) + state = new SamzaAppMasterState(-1, containerId, nodeHostString, nodePortString.toInt, nodeHttpPortString.toInt) val service = new SamzaAppMasterService(config, state, registry, clientHelper) - val lifecycle = new SamzaAppMasterLifecycle(containerMem, containerCpu, state, amClient, hConfig) + val lifecycle = new SamzaAppMasterLifecycle(containerMem, containerCpu, state, amClient) val metrics = new SamzaAppMasterMetrics(config, state, registry) val am = new SamzaAppMasterTaskManager({ System.currentTimeMillis }, config, state, amClient, hConfig) - - // run the app master - new YarnAppMaster(List(state, service, lifecycle, metrics, am), amClient).run + listeners = List(state, service, lifecycle, metrics, am) + run(amClient, listeners, hConfig, interval) } finally { // jmxServer has to be stopped or will prevent process from exiting. if (jmxServer.isDefined) { @@ -86,4 +97,52 @@ object SamzaAppMaster extends Logging { } } } + + def run(amClient: AMRMClientAsync[ContainerRequest], listeners: List[YarnAppMasterListener], hConfig: YarnConfiguration, interval: Int): Unit = { + try { + amClient.init(hConfig) + amClient.start + listeners.foreach(_.onInit) + var isShutdown: Boolean = false + // have the loop to prevent the process from exiting until the job is to shutdown or error occurs on amClient + while (!isShutdown && !listeners.map(_.shouldShutdown).reduceLeft(_ || _) && storedException == null) { + try { + Thread.sleep(interval) + } catch { + case e: InterruptedException => { + isShutdown = true + info("got interrupt in app master thread, so shutting down") + } + } + } + } finally { + // listeners has to be stopped + listeners.foreach(listener => try { + listener.onShutdown + } catch { + case e: Exception => warn("Listener %s failed to shutdown." format listener, e) + }) + // amClient has to be stopped + amClient.stop + } + } + + override def onContainersCompleted(statuses: java.util.List[ContainerStatus]): Unit = + statuses.foreach(containerStatus => listeners.foreach(_.onContainerCompleted(containerStatus))) + + override def onContainersAllocated(containers: java.util.List[Container]): Unit = + containers.foreach(container => listeners.foreach(_.onContainerAllocated(container))) + + override def onShutdownRequest: Unit = listeners.foreach(_.onReboot) + + override def onNodesUpdated(updatedNodes: java.util.List[NodeReport]): Unit = Unit + + // TODO need to think about meaningful SAMZA's progress + override def getProgress: Float = 0.0F + + override def onError(e: Throwable): Unit = { + error("Error occured in amClient's callback", e) + storedException = e + } + } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala index 5d09265..8cb9490 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala @@ -18,27 +18,26 @@ */ package org.apache.samza.job.yarn -import grizzled.slf4j.Logging -import org.apache.samza.SamzaException -import org.apache.hadoop.yarn.client.api.AMRMClient -import org.apache.hadoop.yarn.conf.YarnConfiguration + import org.apache.hadoop.yarn.api.records.FinalApplicationStatus +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync +import org.apache.samza.SamzaException + +import grizzled.slf4j.Logging /** * Responsible for managing the lifecycle of the application master. Mostly, * this means registering and unregistering with the RM, and shutting down * when the RM tells us to Reboot. */ -class SamzaAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: SamzaAppMasterState, amClient: AMRMClient[_], conf: YarnConfiguration) extends YarnAppMasterListener with Logging { +class SamzaAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: SamzaAppMasterState, amClient: AMRMClientAsync[ContainerRequest]) extends YarnAppMasterListener with Logging { var validResourceRequest = true var shutdownMessage: String = null override def onInit() { val host = state.nodeHost - amClient.init(conf); - amClient.start - val response = amClient.registerApplicationMaster(host, state.rpcPort, "%s:%d" format (host, state.trackingPort)) // validate that the YARN cluster can handle our container resource requirements @@ -63,7 +62,6 @@ class SamzaAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: Samza override def onShutdown() { info("Shutting down.") amClient.unregisterApplicationMaster(state.status, shutdownMessage, null) - amClient.stop } override def shouldShutdown = !validResourceRequest http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala index 983771d..851aae6 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala @@ -51,7 +51,6 @@ class SamzaAppMasterMetrics( val registry: ReadableMetricsRegistry) extends MetricsHelper with YarnAppMasterListener with Logging { val jvm = new JvmMetrics(registry) - val mEventLoops = newCounter("event-loops") val reporters = config.getMetricReporterNames.map(reporterName => { val metricsFactoryClassName = config .getMetricsFactoryClass(reporterName) @@ -82,10 +81,6 @@ class SamzaAppMasterMetrics( reporters.values.foreach(_.start) } - override def onEventLoop() { - mEventLoops.inc - } - override def onShutdown() { reporters.values.foreach(_.stop) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala index 9058210..418a981 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala @@ -18,38 +18,34 @@ */ package org.apache.samza.job.yarn -import org.apache.hadoop.yarn.api.records.ContainerStatus -import org.apache.hadoop.yarn.api.records.Container -import org.apache.samza.config.Config -import grizzled.slf4j.Logging -import org.apache.samza.config.YarnConfig.Config2Yarn -import org.apache.samza.config.YarnConfig -import org.apache.samza.job.CommandBuilder -import org.apache.hadoop.yarn.api.ApplicationConstants -import org.apache.hadoop.security.UserGroupInformation -import org.apache.hadoop.fs.Path -import org.apache.samza.config.TaskConfig.Config2Task -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus -import org.apache.samza.util.Util + +import java.nio.ByteBuffer +import java.util.Collections + import scala.collection.JavaConversions._ -import org.apache.hadoop.yarn.client.api.AMRMClient -import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest -import org.apache.hadoop.yarn.api.records.Priority -import org.apache.hadoop.yarn.api.records.Resource -import org.apache.hadoop.yarn.util.Records -import org.apache.hadoop.yarn.api.records.LocalResource -import org.apache.hadoop.yarn.util.ConverterUtils -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.DataOutputBuffer +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest +import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import org.apache.hadoop.yarn.client.api.NMClient +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.api.records.LocalResourceType -import org.apache.hadoop.yarn.api.records.LocalResourceVisibility -import java.util.Collections -import org.apache.samza.job.ShellCommandBuilder -import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.yarn.security.AMRMTokenIdentifier -import java.nio.ByteBuffer -import org.apache.hadoop.yarn.client.api.impl.NMClientImpl +import org.apache.hadoop.yarn.util.ConverterUtils +import org.apache.hadoop.yarn.util.Records +import org.apache.samza.config.Config +import org.apache.samza.config.TaskConfig.Config2Task +import org.apache.samza.config.YarnConfig +import org.apache.samza.config.YarnConfig.Config2Yarn +import org.apache.samza.job.CommandBuilder +import org.apache.samza.job.ShellCommandBuilder +import org.apache.samza.util.Util + +import grizzled.slf4j.Logging object SamzaAppMasterTaskManager { val DEFAULT_CONTAINER_MEM = 1024 @@ -66,7 +62,7 @@ case class TaskFailure(val count: Int, val lastFailure: Long) * containers, handling failures, and notifying the application master that the * job is done. */ -class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaAppMasterState, amClient: AMRMClient[ContainerRequest], conf: YarnConfiguration) extends YarnAppMasterListener with Logging { +class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaAppMasterState, amClient: AMRMClientAsync[ContainerRequest], conf: YarnConfiguration) extends YarnAppMasterListener with Logging { import SamzaAppMasterTaskManager._ state.taskCount = config.getTaskCount match { @@ -79,14 +75,15 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA val allSystemStreamPartitions = Util.getInputStreamPartitions(config) var taskFailures = Map[Int, TaskFailure]() var tooManyFailedContainers = false - var containerManager: NMClientImpl = null + // TODO we might want to use NMClientAsync as well + var containerManager: NMClient = null override def shouldShutdown = state.completedTasks == state.taskCount || tooManyFailedContainers override def onInit() { state.neededContainers = state.taskCount state.unclaimedTasks = (0 until state.taskCount).toSet - containerManager = new NMClientImpl() + containerManager = NMClient.createNMClient() containerManager.init(conf) containerManager.start http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala deleted file mode 100644 index e45c177..0000000 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.job.yarn - -import scala.collection.JavaConversions._ -import grizzled.slf4j.Logging -import org.apache.hadoop.yarn.client.api.AMRMClient -import org.apache.hadoop.yarn.api.records.AMCommand._ -import org.apache.samza.SamzaException - -/** - * YARN's API is somewhat clunky. Most implementations just sit in a loop, and - * poll the resource manager every N seconds (see the distributed shell - * example). To make life slightly better, Samza separates the polling logic - * from the application master logic, and we convert synchronous polling calls - * to callbacks, which are more intuitive when dealing with event based - * paradigms like YARN. - * - * <br/><br/> - * - * SamzaAppMaster uses this class to wire up all of Samza's application master - * listeners. - */ -class YarnAppMaster(pollIntervalMs: Long, listeners: List[YarnAppMasterListener], amClient: AMRMClient[_]) extends Logging { - var isShutdown = false - - def this(listeners: List[YarnAppMasterListener], amClient: AMRMClient[_]) = this(1000, listeners, amClient) - - def run { - try { - listeners.foreach(_.onInit) - - while (!isShutdown && !listeners.map(_.shouldShutdown).reduceLeft(_ || _)) { - val response = amClient.allocate(0) - - if (response.getAMCommand != null) { - response.getAMCommand match { - case AM_RESYNC | AM_SHUTDOWN => - listeners.foreach(_.onReboot) - case _ => - val msg = "Unhandled value of AMCommand: " + response.getAMCommand - error(msg); - throw new SamzaException(msg); - } - } - - listeners.foreach(_.onEventLoop) - response.getCompletedContainersStatuses.foreach(containerStatus => listeners.foreach(_.onContainerCompleted(containerStatus))) - response.getAllocatedContainers.foreach(container => listeners.foreach(_.onContainerAllocated(container))) - - try { - Thread.sleep(pollIntervalMs) - } catch { - case e: InterruptedException => { - isShutdown = true - info("got interrupt in app master thread, so shutting down") - } - } - } - } finally { - listeners.foreach(listener => try { - listener.onShutdown - } catch { - case e: Exception => warn("Listener %s failed to shutdown." format listener, e) - }) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMasterListener.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMasterListener.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMasterListener.scala index 1353e86..6bf3046 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMasterListener.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMasterListener.scala @@ -65,9 +65,4 @@ trait YarnAppMasterListener { */ def onContainerCompleted(containerStatus: ContainerStatus) {} - /** - * Invoked by YarnAppMaster once per listener, every time it loops around to - * poll the RM again. - */ - def onEventLoop() {} } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala new file mode 100644 index 0000000..190ce28 --- /dev/null +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.yarn + +import scala.annotation.elidable +import scala.annotation.elidable.ASSERTION + +import org.apache.hadoop.yarn.api.records.{ Container, ContainerStatus } +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.junit.Test + +import TestSamzaAppMasterTaskManager._ + +class TestSamzaAppMaster { + @Test + def testAppMasterShouldShutdown { + val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List()))) + val listener = new YarnAppMasterListener { + var init = 0 + var shutdown = 0 + var allocated = 0 + var complete = 0 + override def shouldShutdown = true + override def onInit() { + init += 1 + } + override def onShutdown() { + shutdown += 1 + } + override def onContainerAllocated(container: Container) { + allocated += 1 + } + override def onContainerCompleted(containerStatus: ContainerStatus) { + complete += 1 + } + } + SamzaAppMaster.listeners = List(listener) + SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1) + assert(listener.init == 1) + assert(listener.shutdown == 1) + } + + @Test + def testAppMasterShouldShutdownWithFailingListener { + val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List()))) + val listener1 = new YarnAppMasterListener { + var shutdown = 0 + override def shouldShutdown = true + override def onShutdown() { + shutdown += 1 + throw new RuntimeException("Some weird failure") + } + } + val listener2 = new YarnAppMasterListener { + var shutdown = 0 + override def shouldShutdown = true + override def onShutdown() { + shutdown += 1 + } + } + // listener1 will throw an exception in shutdown, and listener2 should still get called + SamzaAppMaster.listeners = List(listener1, listener2) + SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1) + assert(listener1.shutdown == 1) + assert(listener2.shutdown == 1) + } + + @Test + def testAppMasterShouldShutdownWithInterrupt { + val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List()))) + val listener = new YarnAppMasterListener { + var init = 0 + var shutdown = 0 + override def shouldShutdown = false + override def onInit() { + init += 1 + } + override def onShutdown() { + shutdown += 1 + } + } + val thread = new Thread { + override def run { + SamzaAppMaster.listeners = List(listener) + SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1) + } + } + thread.start + thread.interrupt + thread.join + assert(listener.init == 1) + assert(listener.shutdown == 1) + } + + @Test + def testAppMasterShouldForwardAllocatedAndCompleteContainers { + val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(getContainer(null)), List(getContainerStatus(null, 1, null))))) + val listener = new YarnAppMasterListener { + var allocated = 0 + var complete = 0 + override def onInit(): Unit = amClient.registerApplicationMaster("", -1, "") + override def shouldShutdown = (allocated >= 1 && complete >= 1) + override def onContainerAllocated(container: Container) { + allocated += 1 + } + override def onContainerCompleted(containerStatus: ContainerStatus) { + complete += 1 + } + } + SamzaAppMaster.listeners = List(listener) + SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1) + // heartbeat may be triggered for more than once + assert(listener.allocated >= 1) + assert(listener.complete >= 1) + } + + @Test + def testAppMasterShouldReboot { + val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(true, List(), List()))) + val listener = new YarnAppMasterListener { + var reboot = 0 + override def onInit(): Unit = amClient.registerApplicationMaster("", -1, "") + override def shouldShutdown = reboot >= 1 + override def onReboot() { + reboot += 1 + } + } + SamzaAppMaster.listeners = List(listener) + SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1) + // heartbeat may be triggered for more than once + assert(listener.reboot >= 1) + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala index 4ee77e8..cce63eb 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala @@ -18,28 +18,26 @@ */ package org.apache.samza.job.yarn -import org.junit.Assert._ -import org.junit.Test + +import java.nio.ByteBuffer + +import scala.annotation.elidable +import scala.annotation.elidable.ASSERTION + import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse -import org.apache.hadoop.yarn.api.records.ContainerId -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus -import org.apache.hadoop.yarn.api.records.Priority -import org.apache.hadoop.yarn.api.records.Resource +import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest -import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl -import org.apache.hadoop.yarn.api.records.Resource -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse -import org.apache.hadoop.service._ +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync.CallbackHandler +import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl +import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.samza.SamzaException -import org.apache.hadoop.yarn.api.records.ApplicationAccessType -import java.nio.ByteBuffer +import org.junit.Assert._ +import org.junit.Test +import org.mockito.Mockito class TestSamzaAppMasterLifecycle { - val amClient = new AMRMClientImpl() { + val amClient = new AMRMClientAsyncImpl[ContainerRequest](1, Mockito.mock(classOf[CallbackHandler])) { var host = "" var port = 0 var status: FinalApplicationStatus = null @@ -61,7 +59,6 @@ class TestSamzaAppMasterLifecycle { override def setClientToAMTokenMasterKey(buffer: ByteBuffer) {} } } - override def allocate(progressIndicator: Float): AllocateResponse = null override def unregisterApplicationMaster(appStatus: FinalApplicationStatus, appMessage: String, appTrackingUrl: String) { @@ -70,19 +67,16 @@ class TestSamzaAppMasterLifecycle { override def releaseAssignedContainer(containerId: ContainerId) {} override def getClusterNodeCount() = 1 - override def init(config: Configuration) {} - override def start() {} - override def stop() {} - override def getName(): String = "" - override def getConfig() = null - override def getStartTime() = 0L + override def serviceInit(config: Configuration) {} + override def serviceStart() {} + override def serviceStop() {} } @Test def testLifecycleShouldRegisterOnInit { val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "test", 1, 2) state.rpcPort = 1 - val saml = new SamzaAppMasterLifecycle(512, 2, state, amClient, new YarnConfiguration) + val saml = new SamzaAppMasterLifecycle(512, 2, state, amClient) saml.onInit assert(amClient.host == "test") assert(amClient.port == 1) @@ -93,7 +87,7 @@ class TestSamzaAppMasterLifecycle { def testLifecycleShouldUnregisterOnShutdown { val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2) state.status = FinalApplicationStatus.SUCCEEDED - new SamzaAppMasterLifecycle(128, 1, state, amClient, new YarnConfiguration).onShutdown + new SamzaAppMasterLifecycle(128, 1, state, amClient).onShutdown assert(amClient.status == FinalApplicationStatus.SUCCEEDED) } @@ -101,7 +95,7 @@ class TestSamzaAppMasterLifecycle { def testLifecycleShouldThrowAnExceptionOnReboot { var gotException = false try { - new SamzaAppMasterLifecycle(368, 1, null, amClient, new YarnConfiguration).onReboot + new SamzaAppMasterLifecycle(368, 1, null, amClient).onReboot } catch { // expected case e: SamzaException => gotException = true @@ -113,8 +107,8 @@ class TestSamzaAppMasterLifecycle { def testLifecycleShouldShutdownOnInvalidContainerSettings { val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "test", 1, 2) state.rpcPort = 1 - List(new SamzaAppMasterLifecycle(768, 1, state, amClient, new YarnConfiguration), - new SamzaAppMasterLifecycle(368, 3, state, amClient, new YarnConfiguration)).map(saml => { + List(new SamzaAppMasterLifecycle(768, 1, state, amClient), + new SamzaAppMasterLifecycle(368, 3, state, amClient)).map(saml => { saml.onInit assertTrue(saml.shouldShutdown) }) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala index 9d832ae..7fd80d5 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala @@ -18,27 +18,33 @@ */ package org.apache.samza.job.yarn -import org.junit.Assert._ -import org.junit.Test -import org.apache.samza.config.Config -import org.apache.samza.config.MapConfig -import org.apache.samza.Partition -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.util.ConverterUtils + +import scala.annotation.elidable +import scala.annotation.elidable.ASSERTION import scala.collection.JavaConversions._ -import org.apache.hadoop.yarn.conf.YarnConfiguration + import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse +import org.apache.hadoop.fs.Path import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse +import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl -import org.apache.hadoop.fs.Path -import org.apache.hadoop.yarn.api.records.NodeReport -import TestSamzaAppMasterTaskManager._ -import org.apache.samza.system.{SystemStreamPartition, SystemFactory} +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.util.ConverterUtils +import org.apache.samza.Partition +import org.apache.samza.config.Config +import org.apache.samza.config.MapConfig import org.apache.samza.metrics.MetricsRegistry -import org.apache.samza.util.Util +import org.apache.samza.system.SystemFactory +import org.apache.samza.system.SystemStreamPartition import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin +import org.apache.samza.util.Util +import org.junit.Assert._ +import org.junit.Test + +import TestSamzaAppMasterTaskManager._ object TestSamzaAppMasterTaskManager { def getContainer(containerId: ContainerId) = new Container { @@ -80,7 +86,11 @@ object TestSamzaAppMasterTaskManager { override def setDiagnostics(diagnostics: String) = {} } - def getAmClient = (response: AllocateResponse) => new AMRMClientImpl[ContainerRequest] { + def getAmClient = (amClient: TestAMRMClientImpl) => new AMRMClientAsyncImpl(amClient, 1, SamzaAppMaster) { + def getClient: TestAMRMClientImpl = amClient + } + + class TestAMRMClientImpl(response: AllocateResponse) extends AMRMClientImpl[ContainerRequest] { var requests: List[ContainerRequest] = List[ContainerRequest]() def getRelease = release @@ -91,13 +101,9 @@ object TestSamzaAppMasterTaskManager { override def addContainerRequest(req: ContainerRequest) { requests ::= req } override def removeContainerRequest(req: ContainerRequest) {} override def getClusterNodeCount() = 1 - - override def init(config: Configuration) {} - override def start() {} - override def stop() {} - override def getName(): String = "" - override def getConfig() = null - override def getStartTime() = 0L + override def serviceInit(config: Configuration) {} + override def serviceStart() {} + override def serviceStop() {} } def getAppMasterResponse(reboot: Boolean, containers: List[Container], completed: List[ContainerStatus]) = @@ -111,10 +117,10 @@ object TestSamzaAppMasterTaskManager { override def getCompletedContainersStatuses() = completed override def setCompletedContainersStatuses(containers: java.util.List[ContainerStatus]) {} override def setUpdatedNodes(nodes: java.util.List[NodeReport]) {} - override def getUpdatedNodes = null + override def getUpdatedNodes = List[NodeReport]() override def getNumClusterNodes = 1 override def setNumClusterNodes(num: Int) {} - override def getNMTokens = null + override def getNMTokens = List[NMToken]() override def setNMTokens(nmTokens: java.util.List[NMToken]) {} override def setAMCommand(command: AMCommand) {} override def getPreemptionMessage = null @@ -164,7 +170,7 @@ class TestSamzaAppMasterTaskManager { @Test def testAppMasterShouldRequestANewContainerWhenATaskFails { - val amClient = getAmClient(getAppMasterResponse(false, List(), List())) + val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List()))) val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2) val taskManager = new SamzaAppMasterTaskManager(clock, config, state, amClient, new YarnConfiguration) { override def startContainer(packagePath: Path, container: Container, env: Map[String, String], cmds: String*) { @@ -179,20 +185,20 @@ class TestSamzaAppMasterTaskManager { taskManager.onContainerCompleted(getContainerStatus(container2, 1, "expecting a failure here")) assert(taskManager.shouldShutdown == false) // 2. First is from onInit, second is from onContainerCompleted, since it failed. - assertEquals(2, amClient.requests.size) - assertEquals(0, amClient.getRelease.size) + assertEquals(2, amClient.getClient.requests.size) + assertEquals(0, amClient.getClient.getRelease.size) assertFalse(taskManager.shouldShutdown) // Now trigger an AM shutdown since our retry count is 1, and we're failing twice taskManager.onContainerAllocated(getContainer(container2)) taskManager.onContainerCompleted(getContainerStatus(container2, 1, "expecting a failure here")) - assertEquals(2, amClient.requests.size) - assertEquals(0, amClient.getRelease.size) + assertEquals(2, amClient.getClient.requests.size) + assertEquals(0, amClient.getClient.getRelease.size) assertTrue(taskManager.shouldShutdown) } @Test def testAppMasterShouldRequestANewContainerWhenATaskIsReleased { - val amClient = getAmClient(getAppMasterResponse(false, List(), List())) + val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List()))) val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2) state.taskCount = 2 var containersRequested = 0 @@ -213,8 +219,8 @@ class TestSamzaAppMasterTaskManager { assert(taskManager.shouldShutdown == false) taskManager.onInit assert(taskManager.shouldShutdown == false) - assert(amClient.requests.size == 1) - assert(amClient.getRelease.size == 0) + assert(amClient.getClient.requests.size == 1) + assert(amClient.getClient.getRelease.size == 0) // allocate container 2 taskManager.onContainerAllocated(getContainer(container2)) @@ -231,13 +237,13 @@ class TestSamzaAppMasterTaskManager { assert(state.runningTasks.size == 1) assert(state.taskPartitions.size == 1) assert(state.unclaimedTasks.size == 0) - assert(amClient.requests.size == 1) - assert(amClient.getRelease.size == 1) - assert(amClient.getRelease.head.equals(container3)) + assert(amClient.getClient.requests.size == 1) + assert(amClient.getClient.getRelease.size == 1) + assert(amClient.getClient.getRelease.head.equals(container3)) // reset the helper state, so we can make sure that releasing the container (next step) doesn't request more resources - amClient.requests = List() - amClient.resetRelease + amClient.getClient.requests = List() + amClient.getClient.resetRelease // now release the container, and make sure the AM doesn't ask for more assert(taskManager.shouldShutdown == false) @@ -247,15 +253,15 @@ class TestSamzaAppMasterTaskManager { assert(state.runningTasks.size == 1) assert(state.taskPartitions.size == 1) assert(state.unclaimedTasks.size == 0) - assert(amClient.requests.size == 0) - assert(amClient.getRelease.size == 0) + assert(amClient.getClient.requests.size == 0) + assert(amClient.getClient.getRelease.size == 0) // pretend container 2 is released due to an NM failure, and make sure that the AM requests a new container assert(taskManager.shouldShutdown == false) taskManager.onContainerCompleted(getContainerStatus(container2, -100, "pretend the container was 'lost' due to an NM failure")) assert(taskManager.shouldShutdown == false) - assert(amClient.requests.size == 1) - assert(amClient.getRelease.size == 0) + assert(amClient.getClient.requests.size == 1) + assert(amClient.getClient.getRelease.size == 0) } @Test @@ -263,7 +269,7 @@ class TestSamzaAppMasterTaskManager { val map = new java.util.HashMap[String, String](config) map.put("yarn.container.count", "2") val newConfig = new MapConfig(map) - val amClient = getAmClient(getAppMasterResponse(false, List(), List())) + val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List()))) val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2) state.taskCount = 2 var containersStarted = 0 @@ -278,8 +284,8 @@ class TestSamzaAppMasterTaskManager { assert(taskManager.shouldShutdown == false) taskManager.onInit assert(taskManager.shouldShutdown == false) - assert(amClient.requests.size == 2) - assert(amClient.getRelease.size == 0) + assert(amClient.getClient.requests.size == 2) + assert(amClient.getClient.getRelease.size == 0) taskManager.onContainerAllocated(getContainer(container2)) assert(state.neededContainers == 1) assert(state.runningTasks.size == 1) @@ -330,7 +336,7 @@ class TestSamzaAppMasterTaskManager { @Test def testAppMasterShouldReleaseExtraContainers { - val amClient = getAmClient(getAppMasterResponse(false, List(), List())) + val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List()))) val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2) var containersRequested = 0 var containersStarted = 0 @@ -350,8 +356,8 @@ class TestSamzaAppMasterTaskManager { assert(taskManager.shouldShutdown == false) taskManager.onInit assert(taskManager.shouldShutdown == false) - assert(amClient.requests.size == 1) - assert(amClient.getRelease.size == 0) + assert(amClient.getClient.requests.size == 1) + assert(amClient.getClient.getRelease.size == 0) assert(state.neededContainers == 1) assert(state.runningTasks.size == 0) assert(state.taskPartitions.size == 0) @@ -370,9 +376,9 @@ class TestSamzaAppMasterTaskManager { assert(state.unclaimedTasks.size == 0) assert(containersRequested == 1) assert(containersStarted == 1) - assert(amClient.requests.size == 1) - assert(amClient.getRelease.size == 1) - assert(amClient.getRelease.head.equals(container3)) + assert(amClient.getClient.requests.size == 1) + assert(amClient.getClient.getRelease.size == 1) + assert(amClient.getClient.getRelease.head.equals(container3)) } @Test http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnAppMaster.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnAppMaster.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnAppMaster.scala deleted file mode 100644 index 98f7844..0000000 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnAppMaster.scala +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.job.yarn -import org.junit.Assert._ -import org.junit.Test -import TestSamzaAppMasterTaskManager._ -import org.apache.hadoop.yarn.api.records.Container -import org.apache.hadoop.yarn.api.records.ContainerStatus -import org.apache.hadoop.yarn.api.records.ResourceRequest -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.api.records.ContainerId -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse - -class TestYarnAppMaster { - @Test - def testAppMasterShouldShutdown { - val amClient = getAmClient(getAppMasterResponse(false, List(), List())) - val listener = new YarnAppMasterListener { - var init = 0 - var shutdown = 0 - var allocated = 0 - var complete = 0 - override def shouldShutdown = true - override def onInit() { - init += 1 - } - override def onShutdown() { - shutdown += 1 - } - override def onContainerAllocated(container: Container) { - allocated += 1 - } - override def onContainerCompleted(containerStatus: ContainerStatus) { - complete += 1 - } - } - new YarnAppMaster(List(listener), amClient).run - assert(listener.init == 1) - assert(listener.shutdown == 1) - } - - @Test - def testAppMasterShouldShutdownWithFailingListener { - val amClient = getAmClient(getAppMasterResponse(false, List(), List())) - val listener1 = new YarnAppMasterListener { - var shutdown = 0 - override def shouldShutdown = true - override def onShutdown() { - shutdown += 1 - throw new RuntimeException("Some weird failure") - } - } - val listener2 = new YarnAppMasterListener { - var shutdown = 0 - override def shouldShutdown = true - override def onShutdown() { - shutdown += 1 - } - } - // listener1 will throw an exception in shutdown, and listener2 should still get called - new YarnAppMaster(List(listener1, listener2), amClient).run - assert(listener1.shutdown == 1) - assert(listener2.shutdown == 1) - } - - @Test - def testAppMasterShouldShutdownWithInterrupt { - val amClient = getAmClient(getAppMasterResponse(false, List(), List())) - val listener = new YarnAppMasterListener { - var init = 0 - var shutdown = 0 - override def shouldShutdown = false - override def onInit() { - init += 1 - } - override def onShutdown() { - shutdown += 1 - } - } - val am = new YarnAppMaster(List(listener), amClient) - val thread = new Thread { - override def run { - am.run - } - } - thread.start - thread.interrupt - thread.join - assert(listener.init == 1) - assert(listener.shutdown == 1) - } - - @Test - def testAppMasterShouldForwardAllocatedAndCompleteContainers { - val amClient = getAmClient(getAppMasterResponse(false, List(getContainer(null)), List(getContainerStatus(null, 1, null)))) - val listener = new YarnAppMasterListener { - var allocated = 0 - var complete = 0 - override def shouldShutdown = (allocated == 1 && complete == 1) - override def onContainerAllocated(container: Container) { - allocated += 1 - } - override def onContainerCompleted(containerStatus: ContainerStatus) { - complete += 1 - } - } - new YarnAppMaster(List(listener), amClient).run - assert(listener.allocated == 1) - assert(listener.complete == 1) - } - - @Test - def testAppMasterShouldReboot { - val amClient = getAmClient(getAppMasterResponse(true, List(), List())) - val listener = new YarnAppMasterListener { - var reboot = 0 - override def shouldShutdown = reboot == 1 - override def onReboot() { - reboot += 1 - } - } - new YarnAppMaster(List(listener), amClient).run - assert(listener.reboot == 1) - } -}
