This is an automated email from the ASF dual-hosted git repository. style95 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new e36b2d8c0 Exclude warmed containers in disabled invokers. (#5313) e36b2d8c0 is described below commit e36b2d8c0cd3ef362ea0f6220d9af0371acd10c2 Author: Dominic Kim <styl...@apache.org> AuthorDate: Mon Aug 22 14:08:03 2022 +0900 Exclude warmed containers in disabled invokers. (#5313) * Exclude warmed containers in disabled invokers. * Exclude warmed containers in disabled invokers. * Find the first warmed container. * Remove the code added by mistake. * Add more logs for error cases. --- .../apache/openwhisk/core/connector/Message.scala | 17 + .../core/invoker/FPCInvokerReactive.scala | 16 +- .../scheduler/container/ContainerManager.scala | 383 ++++++++++------- .../container/test/ContainerManagerTests.scala | 464 +++++++++++++++++---- 4 files changed, 627 insertions(+), 253 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala index d3c550eb2..b65b11496 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala @@ -636,6 +636,23 @@ object ContainerMessage extends DefaultJsonProtocol { sealed trait ContainerCreationError object ContainerCreationError extends Enumeration { + import scala.language.implicitConversions + implicit def containerCreationErrorToString(x: ContainerCreationError): String = { + x match { + case NoAvailableInvokersError => "no available invoker is found" + case NoAvailableResourceInvokersError => "no available invoker with the resources is found: " + case ResourceNotEnoughError => "invoker(s) have not enough resources" + case WhiskError => "whisk error(recoverable) happens" + case UnknownError => "a unknown error happens" + case TimeoutError => "a timeout error happens" + case ShuttingDownError => "shutting down error happens" + case NonExecutableActionError => "no executable found for the action" + case DBFetchError => "an error happens while fetching data from DB" + case BlackBoxError => "a blackbox error happens" + case ZeroNamespaceLimit => "the namespace has 0 limit configured" + case TooManyConcurrentRequests => "too many concurrent requests are in flight." + } + } case object NoAvailableInvokersError extends ContainerCreationError diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala index e0393c056..e38c39701 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala @@ -376,18 +376,6 @@ class FPCInvokerReactive(config: WhiskConfig, override def enable(): Route = { invokerHealthManager ! Enable pool ! Enable - // re-enable consumer - if (consumer.isEmpty) - consumer = Some( - new ContainerMessageConsumer( - instance, - pool, - entityStore, - cfg, - msgProvider, - longPollDuration = 1.second, - maxPeek, - sendAckToScheduler)) warmUp() complete("Success enable invoker") } @@ -395,15 +383,13 @@ class FPCInvokerReactive(config: WhiskConfig, override def disable(): Route = { invokerHealthManager ! GracefulShutdown pool ! GracefulShutdown - consumer.foreach(_.close()) - consumer = None warmUpWatcher.foreach(_.close()) warmUpWatcher = None complete("Successfully disabled invoker") } override def isEnabled(): Route = { - complete(InvokerEnabled(consumer.nonEmpty && warmUpWatcher.nonEmpty).serialize()) + complete(InvokerEnabled(warmUpWatcher.nonEmpty).serialize()) } override def backfillPrewarm(): Route = { diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala index 837045ed2..038dbe09c 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala @@ -16,62 +16,44 @@ */ package org.apache.openwhisk.core.scheduler.container -import java.nio.charset.StandardCharsets -import java.util.concurrent.ThreadLocalRandom import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props} import akka.event.Logging.InfoLevel import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy} -import org.apache.openwhisk.common.{GracefulShutdown, InvokerHealth, Logging, LoggingMarkers, TransactionId} +import org.apache.openwhisk.common._ import org.apache.openwhisk.core.connector.ContainerCreationError.{ + containerCreationErrorToString, NoAvailableInvokersError, NoAvailableResourceInvokersError } import org.apache.openwhisk.core.connector._ import org.apache.openwhisk.core.entity.size._ -import org.apache.openwhisk.core.entity.{ - Annotations, - ByteSize, - DocRevision, - FullyQualifiedEntityName, - InvokerInstanceId, - MemoryLimit, - SchedulerInstanceId -} +import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.core.etcd.EtcdClient import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, InvokerKeys} import org.apache.openwhisk.core.etcd.EtcdType._ import org.apache.openwhisk.core.scheduler.Scheduler -import org.apache.openwhisk.core.scheduler.message.{ - ContainerCreation, - ContainerDeletion, - ContainerKeyMeta, - CreationJobState, - FailedCreationJob, - RegisterCreationJob, - ReschedulingCreationJob, - SuccessfulCreationJob -} +import org.apache.openwhisk.core.scheduler.container.ContainerManager.{sendState, updateInvokerMemory} +import org.apache.openwhisk.core.scheduler.message._ import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey, QueuePool} -import org.apache.openwhisk.core.service.{ - DeleteEvent, - PutEvent, - UnwatchEndpoint, - WatchEndpoint, - WatchEndpointInserted, - WatchEndpointRemoved -} +import org.apache.openwhisk.core.service._ import org.apache.openwhisk.core.{ConfigKeys, WarmUp, WhiskConfig} -import pureconfig.generic.auto._ import pureconfig.loadConfigOrThrow import spray.json.DefaultJsonProtocol._ +import pureconfig.generic.auto._ +import java.nio.charset.StandardCharsets +import java.util.concurrent.ThreadLocalRandom +import scala.annotation.tailrec import scala.collection.JavaConverters._ -import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future} import scala.collection.concurrent.TrieMap +import scala.collection.immutable +import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future} import scala.util.{Failure, Success} -case class ScheduledPair(msg: ContainerCreationMessage, invokerId: InvokerInstanceId) +case class ScheduledPair(msg: ContainerCreationMessage, + invokerId: Option[InvokerInstanceId], + err: Option[ContainerCreationError] = None) case class BlackboxFractionConfig(managedFraction: Double, blackboxFraction: Double) @@ -157,43 +139,64 @@ class ContainerManager(jobManagerFactory: ActorRefFactory => ActorRef, case _ => } - private def createContainer(msgs: List[ContainerCreationMessage], - memory: ByteSize, - invocationNamespace: String): Unit = { + private def createContainer(msgs: List[ContainerCreationMessage], memory: ByteSize, invocationNamespace: String)( + implicit logging: Logging): Unit = { logging.info(this, s"received ${msgs.size} creation message [${msgs.head.invocationNamespace}:${msgs.head.action}]") - val coldCreations = filterWarmedCreations(msgs) - if (coldCreations.nonEmpty) - ContainerManager - .getAvailableInvokers(etcdClient, memory, invocationNamespace) - .flatMap { invokers => - if (invokers.isEmpty) { - coldCreations.foreach { msg => - ContainerManager.sendState( - FailedCreationJob( - msg.creationId, - msg.invocationNamespace, - msg.action, - msg.revision, - NoAvailableInvokersError, - s"No available invokers.")) - } - Future.failed(NoCapacityException("No available invokers.")) - } else { - coldCreations.foreach { msg => - creationJobManager ! RegisterCreationJob(msg) - } + ContainerManager + .getAvailableInvokers(etcdClient, memory, invocationNamespace) + .foreach { invokers => + if (invokers.isEmpty) { + logging.error(this, "there is no available invoker to schedule.") + msgs.foreach(ContainerManager.sendState(_, NoAvailableInvokersError, NoAvailableInvokersError)) + } else { + val (coldCreations, warmedCreations) = + ContainerManager.filterWarmedCreations(warmedContainers, inProgressWarmedContainers, invokers, msgs) + + // handle warmed creation + val chosenInvokers: immutable.Seq[Option[(Int, ContainerCreationMessage)]] = warmedCreations.map { + warmedCreation => + // update the in-progress map for warmed containers. + // even if it is done in the filterWarmedCreations method, it is still necessary to apply the change to the original map. + warmedCreation._3.foreach(inProgressWarmedContainers.update(warmedCreation._1.creationId.asString, _)) + + // send creation message to the target invoker. + warmedCreation._2 map { chosenInvoker => + val msg = warmedCreation._1 + creationJobManager ! RegisterCreationJob(msg) + sendCreationContainerToInvoker(messagingProducer, chosenInvoker, msg) + (chosenInvoker, msg) + } + } - Future { - ContainerManager - .schedule(invokers, coldCreations, memory) - .map { pair => - sendCreationContainerToInvoker(messagingProducer, pair.invokerId.toInt, pair.msg) - } + // update the resource usage of invokers to apply changes from warmed creations. + val updatedInvokers = chosenInvokers.foldLeft(invokers) { (invokers, chosenInvoker) => + chosenInvoker match { + case Some((chosenInvoker, msg)) => + updateInvokerMemory(chosenInvoker, msg.whiskActionMetaData.limits.memory.megabytes, invokers) + case err => + // this is not supposed to happen. + logging.error(this, s"warmed creation is scheduled but no invoker is chosen: $err") + invokers } - }.andThen { - case Failure(t) => logging.warn(this, s"Failed to create container caused by: $t") } + + // handle cold creations + ContainerManager + .schedule(updatedInvokers, coldCreations.map(_._1), memory) + .map { pair => + pair.invokerId match { + // an invoker is assigned for the msg + case Some(instanceId) => + creationJobManager ! RegisterCreationJob(pair.msg) + sendCreationContainerToInvoker(messagingProducer, instanceId.instance, pair.msg) + + // if a chosen invoker does not exist, it means it failed to find a matching invoker for the msg. + case _ => + pair.err.foreach(error => sendState(pair.msg, error, error)) + } + } } + } } private def getInvokersWithOldContainer(invocationNamespace: String, @@ -248,31 +251,6 @@ class ContainerManager(jobManagerFactory: ActorRefFactory => ActorRef, ContainerKeyMeta(revision, invokerId, containerId) } - // Filter out messages which can use warmed container - private def filterWarmedCreations(msgs: List[ContainerCreationMessage]) = { - msgs.filter { msg => - val warmedPrefix = - containerPrefix(ContainerKeys.warmedPrefix, msg.invocationNamespace, msg.action, Some(msg.revision)) - val chosenInvoker = warmedContainers - .filter(!inProgressWarmedContainers.values.toSeq.contains(_)) - .find { container => - if (container.startsWith(warmedPrefix)) { - logging.info(this, s"Choose a warmed container $container") - inProgressWarmedContainers.update(msg.creationId.asString, container) - true - } else - false - } - .map(_.split("/").takeRight(3).apply(0)) - if (chosenInvoker.nonEmpty) { - creationJobManager ! RegisterCreationJob(msg) - sendCreationContainerToInvoker(messagingProducer, chosenInvoker.get.toInt, msg) - false - } else - true - } - } - private def sendCreationContainerToInvoker(producer: MessageProducer, invoker: Int, msg: ContainerCreationMessage): Future[ResultMetadata] = { @@ -360,6 +338,87 @@ object ContainerManager { */ def rng(mod: Int): Int = ThreadLocalRandom.current().nextInt(mod) + // Partition messages that can use warmed containers. + // return: (list of messages that cannot use warmed containers, list of messages that can take advantage of warmed containers) + protected[container] def filterWarmedCreations(warmedContainers: Set[String], + inProgressWarmedContainers: TrieMap[String, String], + invokers: List[InvokerHealth], + msgs: List[ContainerCreationMessage])( + implicit logging: Logging): (List[(ContainerCreationMessage, Option[Int], Option[String])], + List[(ContainerCreationMessage, Option[Int], Option[String])]) = { + val warmedApplied = msgs.map { msg => + val warmedPrefix = + containerPrefix(ContainerKeys.warmedPrefix, msg.invocationNamespace, msg.action, Some(msg.revision)) + val container = warmedContainers + .filter(!inProgressWarmedContainers.values.toSeq.contains(_)) + .find { container => + if (container.startsWith(warmedPrefix)) { + logging.info(this, s"Choose a warmed container $container") + + // this is required to exclude already chosen invokers + inProgressWarmedContainers.update(msg.creationId.asString, container) + true + } else + false + } + + // chosenInvoker is supposed to have only one item + val chosenInvoker = container + .map(_.split("/").takeRight(3).apply(0)) + // filter warmed containers in disabled invokers + .filter( + invoker => + invokers + // filterWarmedCreations method is supposed to receive healthy invokers only but this will make sure again only healthy invokers are used. + .filter(invoker => invoker.status.isUsable) + .map(_.id.instance) + .contains(invoker.toInt)) + + if (chosenInvoker.nonEmpty && container.nonEmpty) { + (msg, Some(chosenInvoker.get.toInt), Some(container.get)) + } else + (msg, None, None) + } + + warmedApplied.partition { item => + if (item._2.nonEmpty) false + else true + } + } + + protected[container] def updateInvokerMemory(invokerId: Int, + requiredMemory: Long, + invokers: List[InvokerHealth]): List[InvokerHealth] = { + // it must be compared to the instance unique id + val index = invokers.indexOf(invokers.filter(p => p.id.instance == invokerId).head) + val invoker = invokers(index) + + // if the invoker has less than minimum memory, drop it from the list. + if (invoker.id.userMemory.toMB - requiredMemory < MemoryLimit.MIN_MEMORY.toMB) { + // drop the nth element + val split = invokers.splitAt(index) + val _ :: t1 = split._2 + split._1 ::: t1 + } else { + invokers.updated( + index, + invoker.copy(id = invoker.id.copy(userMemory = invoker.id.userMemory - requiredMemory.MB))) + } + } + + protected[container] def updateInvokerMemory(invokerId: Option[InvokerInstanceId], + requiredMemory: Long, + invokers: List[InvokerHealth]): List[InvokerHealth] = { + invokerId match { + case Some(instanceId) => + updateInvokerMemory(instanceId.instance, requiredMemory, invokers) + + case None => + // do nothing + invokers + } + } + /** * Assign an invoker to a message * @@ -395,64 +454,92 @@ object ContainerManager { val resourcesStrictPolicy = msg.whiskActionMetaData.annotations .getAs[Boolean](Annotations.InvokerResourcesStrictPolicyAnnotationName) .getOrElse(true) - val isBlackboxInvocation = msg.whiskActionMetaData.toExecutableWhiskAction.map(_.exec.pull).getOrElse(false) + val isBlackboxInvocation = msg.whiskActionMetaData.toExecutableWhiskAction.exists(_.exec.pull) if (requiredResources.isEmpty) { // only choose managed invokers or blackbox invokers val wantedInvokers = if (isBlackboxInvocation) { - candidates.filter(c => blackboxInvokers.map(b => b.id.instance).contains(c.id.instance)).toSet + logging.info(this, s"[${msg.invocationNamespace}/${msg.action}] looking for blackbox invokers to schedule.") + candidates + .filter( + c => + blackboxInvokers + .map(b => b.id.instance) + .contains(c.id.instance) && c.id.userMemory.toMB >= msg.whiskActionMetaData.limits.memory.megabytes) + .toSet } else { - candidates.filter(c => managedInvokers.map(m => m.id.instance).contains(c.id.instance)).toSet + logging.info(this, s"[${msg.invocationNamespace}/${msg.action}] looking for managed invokers to schedule.") + candidates + .filter( + c => + managedInvokers + .map(m => m.id.instance) + .contains(c.id.instance) && c.id.userMemory.toMB >= msg.whiskActionMetaData.limits.memory.megabytes) + .toSet } val taggedInvokers = candidates.filter(_.id.tags.nonEmpty) if (wantedInvokers.nonEmpty) { - chooseInvokerFromCandidates(wantedInvokers.toList, invokers, pairs, msg) + val scheduledPair = chooseInvokerFromCandidates(wantedInvokers.toList, msg) + val updatedInvokers = + updateInvokerMemory(scheduledPair.invokerId, msg.whiskActionMetaData.limits.memory.megabytes, invokers) + (scheduledPair :: pairs, updatedInvokers) } else if (taggedInvokers.nonEmpty) { // if not found from the wanted invokers, choose tagged invokers then - chooseInvokerFromCandidates(taggedInvokers, invokers, pairs, msg) + logging.info( + this, + s"[${msg.invocationNamespace}/${msg.action}] since there is no available non-tagged invoker, choose one among tagged invokers.") + val scheduledPair = chooseInvokerFromCandidates(taggedInvokers, msg) + val updatedInvokers = + updateInvokerMemory(scheduledPair.invokerId, msg.whiskActionMetaData.limits.memory.megabytes, invokers) + (scheduledPair :: pairs, updatedInvokers) } else { - sendState( - FailedCreationJob( - msg.creationId, - msg.invocationNamespace, - msg.action, - msg.revision, - NoAvailableInvokersError, - s"No available invokers.")) - (pairs, candidates) + logging.error( + this, + s"[${msg.invocationNamespace}/${msg.action}] there is no invoker available to schedule to schedule.") + val scheduledPair = + ScheduledPair(msg, invokerId = None, Some(NoAvailableInvokersError)) + (scheduledPair :: pairs, invokers) } } else { + logging.info(this, s"[${msg.invocationNamespace}/${msg.action}] looking for tagged invokers to schedule.") val wantedInvokers = candidates.filter(health => requiredResources.toSet.subsetOf(health.id.tags.toSet)) if (wantedInvokers.nonEmpty) { - chooseInvokerFromCandidates(wantedInvokers, invokers, pairs, msg) + val scheduledPair = chooseInvokerFromCandidates(wantedInvokers, msg) + val updatedInvokers = + updateInvokerMemory(scheduledPair.invokerId, msg.whiskActionMetaData.limits.memory.megabytes, invokers) + (scheduledPair :: pairs, updatedInvokers) } else if (resourcesStrictPolicy) { - sendState( - FailedCreationJob( - msg.creationId, - msg.invocationNamespace, - msg.action, - msg.revision, - NoAvailableResourceInvokersError, - s"No available invokers with resources $requiredResources.")) - (pairs, candidates) + logging.error( + this, + s"[${msg.invocationNamespace}/${msg.action}] there is no available invoker with the resource: ${requiredResources}") + val scheduledPair = + ScheduledPair(msg, invokerId = None, Some(NoAvailableResourceInvokersError)) + (scheduledPair :: pairs, invokers) } else { + logging.info( + this, + s"[${msg.invocationNamespace}/${msg.action}] since there is no available invoker with the resource, choose any invokers without the resource.") val (noTaggedInvokers, taggedInvokers) = candidates.partition(_.id.tags.isEmpty) if (noTaggedInvokers.nonEmpty) { // choose no tagged invokers first - chooseInvokerFromCandidates(noTaggedInvokers, invokers, pairs, msg) + val scheduledPair = chooseInvokerFromCandidates(noTaggedInvokers, msg) + val updatedInvokers = + updateInvokerMemory(scheduledPair.invokerId, msg.whiskActionMetaData.limits.memory.megabytes, invokers) + (scheduledPair :: pairs, updatedInvokers) } else { val leftInvokers = taggedInvokers.filterNot(health => requiredResources.toSet.subsetOf(health.id.tags.toSet)) - if (leftInvokers.nonEmpty) - chooseInvokerFromCandidates(leftInvokers, invokers, pairs, msg) - else { - sendState( - FailedCreationJob( - msg.creationId, - msg.invocationNamespace, - msg.action, - msg.revision, - NoAvailableInvokersError, - s"No available invokers.")) - (pairs, candidates) + if (leftInvokers.nonEmpty) { + val scheduledPair = chooseInvokerFromCandidates(leftInvokers, msg) + val updatedInvokers = + updateInvokerMemory( + scheduledPair.invokerId, + msg.whiskActionMetaData.limits.memory.megabytes, + invokers) + (scheduledPair :: pairs, updatedInvokers) + } else { + logging.error(this, s"[${msg.invocationNamespace}/${msg.action}] no available invoker is found") + val scheduledPair = + ScheduledPair(msg, invokerId = None, Some(NoAvailableInvokersError)) + (scheduledPair :: pairs, invokers) } } } @@ -462,32 +549,30 @@ object ContainerManager { list } - private def chooseInvokerFromCandidates( - candidates: List[InvokerHealth], - wholeInvokers: List[InvokerHealth], - pairs: List[ScheduledPair], - msg: ContainerCreationMessage)(implicit logging: Logging): (List[ScheduledPair], List[InvokerHealth]) = { - val idx = rng(mod = candidates.size) - val instance = candidates(idx) - // it must be compared to the instance unique id - val idxInWhole = wholeInvokers.indexOf(wholeInvokers.filter(p => p.id.instance == instance.id.instance).head) - val requiredMemory = msg.whiskActionMetaData.limits.memory.megabytes - val updated = - if (instance.id.userMemory.toMB - requiredMemory >= requiredMemory) { // Since ByteSize is negative, it converts to long type and compares. - wholeInvokers.updated( - idxInWhole, - instance.copy(id = instance.id.copy(userMemory = instance.id.userMemory - requiredMemory.MB))) + @tailrec + protected[container] def chooseInvokerFromCandidates(candidates: List[InvokerHealth], msg: ContainerCreationMessage)( + implicit logging: Logging): ScheduledPair = { + val requiredMemory = msg.whiskActionMetaData.limits.memory + if (candidates.isEmpty) { + ScheduledPair(msg, invokerId = None, Some(NoAvailableInvokersError)) + } else if (candidates.forall(p => p.id.userMemory.toMB < requiredMemory.megabytes)) { + ScheduledPair(msg, invokerId = None, Some(NoAvailableResourceInvokersError)) + } else { + val idx = rng(mod = candidates.size) + val instance = candidates(idx) + if (instance.id.userMemory.toMB < requiredMemory.megabytes) { + val split = candidates.splitAt(idx) + val _ :: t1 = split._2 + chooseInvokerFromCandidates(split._1 ::: t1, msg) } else { - // drop the nth element - val split = wholeInvokers.splitAt(idxInWhole) - val _ :: t = split._2 - split._1 ::: t + ScheduledPair(msg, invokerId = Some(instance.id)) } - - (ScheduledPair(msg, instance.id) :: pairs, updated) + } } - private def sendState(state: CreationJobState)(implicit logging: Logging): Unit = { + private def sendState(msg: ContainerCreationMessage, err: ContainerCreationError, reason: String)( + implicit logging: Logging): Unit = { + val state = FailedCreationJob(msg.creationId, msg.invocationNamespace, msg.action, msg.revision, err, reason) QueuePool.get(MemoryQueueKey(state.invocationNamespace, state.action.toDocId.asDocInfo(state.revision))) match { case Some(memoryQueueValue) if memoryQueueValue.isLeader => memoryQueueValue.queue ! state diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala index bf7f14d58..d1f43270b 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala @@ -39,15 +39,16 @@ import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, InvokerKeys} import org.apache.openwhisk.core.etcd.EtcdType._ import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig} -import org.apache.openwhisk.core.scheduler.container._ +import org.apache.openwhisk.core.scheduler.container.{ScheduledPair, _} import org.apache.openwhisk.core.scheduler.message.{ ContainerCreation, ContainerDeletion, FailedCreationJob, RegisterCreationJob, - ReschedulingCreationJob, - SuccessfulCreationJob + ReschedulingCreationJob } + +import scala.language.postfixOps import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey, MemoryQueueValue, QueuePool} import org.apache.openwhisk.core.service.{WatchEndpointInserted, WatchEndpointRemoved} import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig} @@ -60,6 +61,7 @@ import pureconfig.loadConfigOrThrow import spray.json.{JsArray, JsBoolean, JsString} import pureconfig.generic.auto._ +import scala.collection.concurrent.TrieMap import scala.collection.mutable import scala.concurrent.Future import scala.concurrent.duration.{FiniteDuration, _} @@ -182,8 +184,8 @@ class ContainerManagerTests val msg = InvokerResourceMessage( invoker.status.asString, invoker.id.userMemory.toMB, - invoker.id.userMemory.toMB, - invoker.id.userMemory.toMB, + invoker.id.busyMemory.getOrElse(0.MB).toMB, + 0, invoker.id.tags, invoker.id.dedicatedNamespaces) @@ -271,16 +273,22 @@ class ContainerManagerTests it should "try warmed containers first" in { val mockEtcd = mock[EtcdClient] - // for test, only invoker2 is healthy, so that no-warmed creations can be only sent to invoker2 + // at first, invoker states look like this. val invokers: List[InvokerHealth] = List( - InvokerHealth(InvokerInstanceId(0, userMemory = testMemory, tags = Seq.empty[String]), Unhealthy), - InvokerHealth(InvokerInstanceId(1, userMemory = testMemory, tags = Seq.empty[String]), Unhealthy), - InvokerHealth(InvokerInstanceId(2, userMemory = testMemory, tags = Seq.empty[String]), Healthy), + InvokerHealth(InvokerInstanceId(0, userMemory = 256.MB, tags = Seq.empty[String]), Healthy), + InvokerHealth(InvokerInstanceId(1, userMemory = 256.MB, tags = Seq.empty[String]), Healthy), + InvokerHealth(InvokerInstanceId(2, userMemory = 512.MB, tags = Seq.empty[String]), Healthy), ) - expectGetInvokers(mockEtcd, invokers) - expectGetInvokers(mockEtcd, invokers) - expectGetInvokers(mockEtcd, invokers) - expectGetInvokers(mockEtcd, invokers) // this test case will run `getPrefix` for 3 times, and another one for warmup + + // after then, invoker states changes like this. + val updatedInvokers: List[InvokerHealth] = List( + InvokerHealth(InvokerInstanceId(0, userMemory = 512.MB, tags = Seq.empty[String]), Healthy), + InvokerHealth(InvokerInstanceId(1, userMemory = 256.MB, tags = Seq.empty[String]), Healthy), + InvokerHealth(InvokerInstanceId(2, userMemory = 256.MB, tags = Seq.empty[String]), Healthy), + ) + expectGetInvokers(mockEtcd, invokers) // for warm up + expectGetInvokers(mockEtcd, invokers) // for first creation + expectGetInvokers(mockEtcd, updatedInvokers) // for second creation val mockJobManager = TestProbe() val mockWatcher = TestProbe() @@ -294,7 +302,7 @@ class ContainerManagerTests system.actorOf(ContainerManager .props(factory(mockJobManager), mockMessaging(Some(receiver.ref)), testsid, mockEtcd, config, mockWatcher.ref)) - // there are 1 warmed container for `test-namespace/test-action` and 1 for `test-namespace/test-action-2` + // Add warmed containers for action1 and action2 in invoker0 and invoker1 respectively manager ! WatchEndpointInserted( ContainerKeys.warmedPrefix, ContainerKeys.warmedContainers( @@ -349,9 +357,9 @@ class ContainerManagerTests val msgs = List(msg1, msg2, msg3) // it should reuse 2 warmed containers - manager ! ContainerCreation(msgs, 128.MB, testInvocationNamespace) + manager ! ContainerCreation(msgs, 256.MB, testInvocationNamespace) - // msg1 will use warmed container on invoker0, msg2 use warmed container on invoker1, msg3 use the healthy invoker + // msg1 will use warmed container on invoker0, msg2 use warmed container on invoker1, msg3 use the remainder receiver.expectMsg(s"invoker0-$msg1") receiver.expectMsg(s"invoker1-$msg2") receiver.expectMsg(s"invoker2-$msg3") @@ -362,28 +370,7 @@ class ContainerManagerTests case RegisterCreationJob(`msg3`) => true } - // now warmed container for action2 become warmed again - manager ! SuccessfulCreationJob(msg2.creationId, msg2.invocationNamespace, msg2.action, msg2.revision) - manager ! SuccessfulCreationJob(msg3.creationId, msg3.invocationNamespace, msg3.action, msg3.revision) - // it still need to use invoker2 - manager ! ContainerCreation(List(msg1), 128.MB, testInvocationNamespace) - receiver.expectMsg(s"invoker2-$msg1") - // it will use warmed container on invoker1 - manager ! ContainerCreation(List(msg2), 128.MB, testInvocationNamespace) - receiver.expectMsg(s"invoker1-$msg2") - - // warmed container for action1 become warmed when received FailedCreationJob - manager ! FailedCreationJob( - msg1.creationId, - msg1.invocationNamespace, - msg1.action, - msg1.revision, - NoAvailableResourceInvokersError, - "") - manager ! ContainerCreation(List(msg1), 128.MB, testInvocationNamespace) - receiver.expectMsg(s"invoker0-$msg1") - - // warmed container for action1 become unwarmed + // remove a warmed container from invoker0 manager ! WatchEndpointRemoved( ContainerKeys.warmedPrefix, ContainerKeys.warmedContainers( @@ -394,8 +381,56 @@ class ContainerManagerTests ContainerId("fake")), "", true) - manager ! ContainerCreation(List(msg1), 128.MB, testInvocationNamespace) - receiver.expectMsg(s"invoker2-$msg1") + + // remove a warmed container from invoker1 + manager ! WatchEndpointRemoved( + ContainerKeys.warmedPrefix, + ContainerKeys.warmedContainers( + testInvocationNamespace, + testfqn.copy(name = EntityName("test-action-2")), + testRevision, + InvokerInstanceId(1, userMemory = 0.bytes), + ContainerId("fake")), + "", + true) + + // create a warmed container for action1 in from invoker1 + manager ! WatchEndpointInserted( + ContainerKeys.warmedPrefix, + ContainerKeys.warmedContainers( + testInvocationNamespace, + testfqn, + testRevision, + InvokerInstanceId(1, userMemory = 0.bytes), + ContainerId("fake")), + "", + true) + + // create a warmed container for action2 in from invoker2 + manager ! WatchEndpointInserted( + ContainerKeys.warmedPrefix, + ContainerKeys.warmedContainers( + testInvocationNamespace, + testfqn.copy(name = EntityName("test-action-2")), + testRevision, + InvokerInstanceId(2, userMemory = 0.bytes), + ContainerId("fake")), + "", + true) + + // it should reuse 2 warmed containers + manager ! ContainerCreation(msgs, 256.MB, testInvocationNamespace) + + // msg1 will use warmed container on invoker1, msg2 use warmed container on invoker2, msg3 use the remainder + receiver.expectMsg(s"invoker1-$msg1") + receiver.expectMsg(s"invoker2-$msg2") + receiver.expectMsg(s"invoker0-$msg3") + + mockJobManager.expectMsgPF() { + case RegisterCreationJob(`msg1`) => true + case RegisterCreationJob(`msg2`) => true + case RegisterCreationJob(`msg3`) => true + } } it should "not try warmed containers if revision is unmatched" in { @@ -521,7 +556,7 @@ class ContainerManagerTests }) } - it should "choice invokers" in { + it should "choose invokers" in { val healthyInvokers: List[InvokerHealth] = List( InvokerHealth(InvokerInstanceId(0, userMemory = 512.MB), Healthy), InvokerHealth(InvokerInstanceId(1, userMemory = 512.MB), Healthy), @@ -563,8 +598,8 @@ class ContainerManagerTests val pairs = ContainerManager.schedule(healthyInvokers, msgs, minMemory) pairs.map(_.msg) should contain theSameElementsAs msgs - pairs.map(_.invokerId).foreach { - healthyInvokers.map(_.id) should contain(_) + pairs.flatMap(_.invokerId).foreach { invokerId => + healthyInvokers.map(_.id) should contain(invokerId) } } @@ -607,7 +642,7 @@ class ContainerManagerTests val pairs = ContainerManager.schedule(healthyInvokers, msgs, minMemory) pairs.map(_.msg) should contain theSameElementsAs msgs - pairs.map(_.invokerId.instance).foreach { + pairs.map(_.invokerId.get.instance).foreach { healthyInvokers.map(_.id.instance) should contain(_) } } @@ -692,21 +727,14 @@ class ContainerManagerTests List(msg1, msg2, msg3, msg4, msg5), msg1.whiskActionMetaData.limits.memory.megabytes.MB) // the memory is same for all msgs pairs should contain theSameElementsAs List( - ScheduledPair(msg1, healthyInvokers(0).id), - ScheduledPair(msg2, healthyInvokers(1).id), - ScheduledPair(msg3, healthyInvokers(2).id), - ScheduledPair(msg4, healthyInvokers(3).id)) - probe.expectMsg( - FailedCreationJob( - msg5.creationId, - testInvocationNamespace, - msg5.action, - testRevision, - NoAvailableResourceInvokersError, - "No available invokers with resources List(fake).")) + ScheduledPair(msg1, Some(healthyInvokers(0).id), None), + ScheduledPair(msg2, Some(healthyInvokers(1).id), None), + ScheduledPair(msg3, Some(healthyInvokers(2).id), None), + ScheduledPair(msg4, Some(healthyInvokers(3).id), None), + ScheduledPair(msg5, None, Some(NoAvailableResourceInvokersError))) } - it should "choose tagged invokers when no invokers available which has no tags first" in { + it should "choose tagged invokers when no untagged invoker is available" in { val msg = ContainerCreationMessage( TransactionId.testing, @@ -740,16 +768,9 @@ class ContainerManagerTests // and for msg2, it should return no available invokers val pairs = ContainerManager.schedule(healthyInvokers, List(msg, msg2), msg.whiskActionMetaData.limits.memory.megabytes.MB) - pairs should contain theSameElementsAs List(ScheduledPair(msg, healthyInvokers(0).id)) - - probe.expectMsg( - FailedCreationJob( - msg2.creationId, - testInvocationNamespace, - msg2.action, - testRevision, - NoAvailableInvokersError, - "No available invokers.")) + pairs should contain theSameElementsAs List( + ScheduledPair(msg, Some(healthyInvokers(0).id), None), + ScheduledPair(msg2, None, Some(NoAvailableInvokersError))) } it should "respect the resource policy while use resource filter" in { @@ -788,7 +809,7 @@ class ContainerManagerTests testfqn.resolve(EntityName("ns3")), testRevision, actionMetadata.copy( - limits = action.limits.copy(memory = MemoryLimit(512.MB)), + limits = action.limits.copy(memory = MemoryLimit(256.MB)), annotations = Parameters(Annotations.InvokerResourcesAnnotationName, JsArray(JsString("non-exist"))) ++ Parameters( Annotations.InvokerResourcesStrictPolicyAnnotationName, @@ -803,7 +824,7 @@ class ContainerManagerTests testfqn.resolve(EntityName("ns3")), testRevision, actionMetadata.copy( - limits = action.limits.copy(memory = MemoryLimit(512.MB)), + limits = action.limits.copy(memory = MemoryLimit(256.MB)), annotations = Parameters(Annotations.InvokerResourcesAnnotationName, JsArray(JsString("non-exist"))) ++ Parameters( Annotations.InvokerResourcesStrictPolicyAnnotationName, @@ -824,21 +845,13 @@ class ContainerManagerTests // while resourcesStrictPolicy is true, and there is no suitable invokers, return an error val pairs = ContainerManager.schedule(healthyInvokers, List(msg1), msg1.whiskActionMetaData.limits.memory.megabytes.MB) - pairs.size shouldBe 0 - probe.expectMsg( - FailedCreationJob( - msg1.creationId, - testInvocationNamespace, - msg1.action, - testRevision, - NoAvailableResourceInvokersError, - "No available invokers with resources List(non-exist).")) + pairs should contain theSameElementsAs List(ScheduledPair(msg1, None, Some(NoAvailableResourceInvokersError))) // while resourcesStrictPolicy is false, and there is no suitable invokers, should choose no tagged invokers first, // here is the invoker0 val pairs2 = ContainerManager.schedule(healthyInvokers, List(msg2), msg2.whiskActionMetaData.limits.memory.megabytes.MB) - pairs2 should contain theSameElementsAs List(ScheduledPair(msg2, healthyInvokers(0).id)) + pairs2 should contain theSameElementsAs List(ScheduledPair(msg2, Some(healthyInvokers(0).id), None)) // while resourcesStrictPolicy is false, and there is no suitable invokers, should choose no tagged invokers first, // if there is none, then choose invokers with other tags, if there is still none, return no available invokers @@ -846,15 +859,9 @@ class ContainerManagerTests healthyInvokers.takeRight(1), List(msg3, msg4), msg3.whiskActionMetaData.limits.memory.megabytes.MB) - pairs3 should contain theSameElementsAs List(ScheduledPair(msg3, healthyInvokers(1).id)) - probe.expectMsg( - FailedCreationJob( - msg4.creationId, - testInvocationNamespace, - msg4.action, - testRevision, - NoAvailableInvokersError, - "No available invokers.")) + pairs3 should contain theSameElementsAs List( + ScheduledPair(msg3, Some(healthyInvokers(1).id)), + ScheduledPair(msg4, None, Some(NoAvailableInvokersError))) } it should "send FailedCreationJob to queue manager when no invokers are available" in { @@ -900,7 +907,7 @@ class ContainerManagerTests msg.action, testRevision, NoAvailableInvokersError, - "No available invokers.")) + NoAvailableInvokersError)) } it should "schedule to the blackbox invoker when isBlackboxInvocation is true" in { @@ -1155,6 +1162,285 @@ class ContainerManagerTests case _ => false } } + + it should "choose an invoker from candidates" in { + val candidates = List( + InvokerHealth(InvokerInstanceId(0, userMemory = 128 MB), Healthy), + InvokerHealth(InvokerInstanceId(1, userMemory = 128 MB), Healthy), + InvokerHealth(InvokerInstanceId(2, userMemory = 256 MB), Healthy), + ) + val msg = ContainerCreationMessage( + TransactionId.testing, + testInvocationNamespace, + FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction)), + testRevision, + actionMetadata, + testsid, + schedulerHost, + rpcPort) + + // no matter how many time we schedule the msg, it should always choose invoker2. + (1 to 10).foreach { _ => + val newPairs = ContainerManager.chooseInvokerFromCandidates(candidates, msg) + newPairs.invokerId shouldBe Some(InvokerInstanceId(2, userMemory = 256 MB)) + } + } + + it should "not choose an invoker when there is no candidate with enough memory" in { + val candidates = List( + InvokerHealth(InvokerInstanceId(0, userMemory = 128 MB), Healthy), + InvokerHealth(InvokerInstanceId(1, userMemory = 128 MB), Healthy), + InvokerHealth(InvokerInstanceId(2, userMemory = 128 MB), Healthy), + ) + val msg = ContainerCreationMessage( + TransactionId.testing, + testInvocationNamespace, + FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction)), + testRevision, + actionMetadata, + testsid, + schedulerHost, + rpcPort) + + // no matter how many time we schedule the msg, no invoker should be assigned. + (1 to 10).foreach { _ => + val newPairs = ContainerManager.chooseInvokerFromCandidates(candidates, msg) + newPairs.invokerId shouldBe None + } + } + + it should "not choose an invoker when there is no candidate" in { + val candidates = List() + val msg = ContainerCreationMessage( + TransactionId.testing, + testInvocationNamespace, + FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction)), + testRevision, + actionMetadata, + testsid, + schedulerHost, + rpcPort) + + val newPairs = ContainerManager.chooseInvokerFromCandidates(candidates, msg) + newPairs.invokerId shouldBe None + } + + it should "update invoker memory" in { + val invokers = List( + InvokerHealth(InvokerInstanceId(0, userMemory = 1024 MB), Healthy), + InvokerHealth(InvokerInstanceId(1, userMemory = 1024 MB), Healthy), + InvokerHealth(InvokerInstanceId(2, userMemory = 1024 MB), Healthy), + ) + val expected = List( + InvokerHealth(InvokerInstanceId(0, userMemory = 1024 MB), Healthy), + InvokerHealth(InvokerInstanceId(1, userMemory = 768 MB), Healthy), + InvokerHealth(InvokerInstanceId(2, userMemory = 1024 MB), Healthy), + ) + val requiredMemory = 256.MB.toMB + val invokerId = Some(InvokerInstanceId(1, userMemory = 1024 MB)) + + val updatedInvokers = ContainerManager.updateInvokerMemory(invokerId, requiredMemory, invokers) + + updatedInvokers shouldBe expected + } + + it should "not update invoker memory when no invoker is assigned" in { + val invokers = List( + InvokerHealth(InvokerInstanceId(0, userMemory = 1024 MB), Healthy), + InvokerHealth(InvokerInstanceId(1, userMemory = 1024 MB), Healthy), + InvokerHealth(InvokerInstanceId(2, userMemory = 1024 MB), Healthy), + ) + val requiredMemory = 256.MB.toMB + + val updatedInvokers = ContainerManager.updateInvokerMemory(None, requiredMemory, invokers) + + updatedInvokers shouldBe invokers + } + + it should "drop an invoker with less memory than MIN_MEMORY" in { + val invokers = List( + InvokerHealth(InvokerInstanceId(0, userMemory = 1024 MB), Healthy), + InvokerHealth(InvokerInstanceId(1, userMemory = 320 MB), Healthy), + InvokerHealth(InvokerInstanceId(2, userMemory = 1024 MB), Healthy), + ) + val expected = List( + InvokerHealth(InvokerInstanceId(0, userMemory = 1024 MB), Healthy), + InvokerHealth(InvokerInstanceId(2, userMemory = 1024 MB), Healthy), + ) + val requiredMemory = 256.MB.toMB + val invokerId = Some(InvokerInstanceId(1, userMemory = 320 MB)) + + val updatedInvokers = ContainerManager.updateInvokerMemory(invokerId, requiredMemory, invokers) + + updatedInvokers shouldBe expected + } + + it should "filter warmed creations when there is no warmed container" in { + + val warmedContainers = Set.empty[String] + val inProgressWarmedContainers = TrieMap.empty[String, String] + + val msg1 = + ContainerCreationMessage( + TransactionId.testing, + testInvocationNamespace, + FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction)), + testRevision, + actionMetadata, + testsid, + schedulerHost, + rpcPort) + val msg2 = + ContainerCreationMessage( + TransactionId.testing, + testInvocationNamespace, + FullyQualifiedEntityName(EntityPath("ns3"), EntityName(testAction)), + testRevision, + actionMetadata, + testsid, + schedulerHost, + rpcPort) + val msg3 = + ContainerCreationMessage( + TransactionId.testing, + testInvocationNamespace, + FullyQualifiedEntityName(EntityPath("ns3"), EntityName(testAction)), + testRevision, + actionMetadata, + testsid, + schedulerHost, + rpcPort) + + val msgs = List(msg1, msg2, msg3) + + val (coldCreations, warmedCreations) = + ContainerManager.filterWarmedCreations(warmedContainers, inProgressWarmedContainers, invokers, msgs) + + warmedCreations.isEmpty shouldBe true + coldCreations.size shouldBe 3 + } + + it should "filter warmed creations when there are warmed containers" in { + val warmedContainers = Set( + ContainerKeys.warmedContainers( + testInvocationNamespace, + testfqn, + testRevision, + InvokerInstanceId(0, userMemory = 0.bytes), + ContainerId("fake")), + ContainerKeys.warmedContainers( + testInvocationNamespace, + testfqn.copy(name = EntityName("test-action-2")), + testRevision, + InvokerInstanceId(1, userMemory = 0.bytes), + ContainerId("fake"))) + val inProgressWarmedContainers = TrieMap.empty[String, String] + + val msg1 = + ContainerCreationMessage( + TransactionId.testing, + testInvocationNamespace, + testfqn, + testRevision, + actionMetadata, + testsid, + schedulerHost, + rpcPort) + val msg2 = + ContainerCreationMessage( + TransactionId.testing, + testInvocationNamespace, + testfqn.copy(name = EntityName("test-action-2")), + testRevision, + actionMetadata, + testsid, + schedulerHost, + rpcPort) + val msg3 = + ContainerCreationMessage( + TransactionId.testing, + testInvocationNamespace, + testfqn, + testRevision, + actionMetadata, + testsid, + schedulerHost, + rpcPort) + + val msgs = List(msg1, msg2, msg3) + + val (coldCreations, warmedCreations) = + ContainerManager.filterWarmedCreations(warmedContainers, inProgressWarmedContainers, invokers, msgs) + + warmedCreations.size shouldBe 2 + coldCreations.size shouldBe 1 + + warmedCreations.map(_._1).contains(msg1) shouldBe true + warmedCreations.map(_._1).contains(msg2) shouldBe true + coldCreations.map(_._1).contains(msg3) shouldBe true + } + + it should "choose cold creation when warmed containers are in disabled invokers" in { + val warmedContainers = Set( + ContainerKeys.warmedContainers( + testInvocationNamespace, + testfqn, + testRevision, + InvokerInstanceId(0, userMemory = 0.bytes), + ContainerId("fake")), + ContainerKeys.warmedContainers( + testInvocationNamespace, + testfqn.copy(name = EntityName("test-action-2")), + testRevision, + InvokerInstanceId(1, userMemory = 0.bytes), + ContainerId("fake"))) + val inProgressWarmedContainers = TrieMap.empty[String, String] + + val msg1 = + ContainerCreationMessage( + TransactionId.testing, + testInvocationNamespace, + testfqn, + testRevision, + actionMetadata, + testsid, + schedulerHost, + rpcPort) + val msg2 = + ContainerCreationMessage( + TransactionId.testing, + testInvocationNamespace, + testfqn.copy(name = EntityName("test-action-2")), + testRevision, + actionMetadata, + testsid, + schedulerHost, + rpcPort) + val msg3 = + ContainerCreationMessage( + TransactionId.testing, + testInvocationNamespace, + testfqn, + testRevision, + actionMetadata, + testsid, + schedulerHost, + rpcPort) + + val msgs = List(msg1, msg2, msg3) + + // unhealthy invokers should not be chosen even if they have warmed containers + val invokers: List[InvokerHealth] = List( + InvokerHealth(InvokerInstanceId(0, userMemory = testMemory, tags = Seq.empty[String]), Unhealthy), + InvokerHealth(InvokerInstanceId(1, userMemory = testMemory, tags = Seq.empty[String]), Unhealthy), + InvokerHealth(InvokerInstanceId(2, userMemory = 1024.MB, tags = Seq.empty[String]), Healthy)) + + val (coldCreations, _) = + ContainerManager.filterWarmedCreations(warmedContainers, inProgressWarmedContainers, invokers, msgs) + + coldCreations.size shouldBe 3 + coldCreations.map(_._1).containsSlice(List(msg1, msg2, msg3)) shouldBe true + } } @RunWith(classOf[JUnitRunner])