This is an automated email from the ASF dual-hosted git repository. tysonnorris pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 113a915 Dynamic LoadBalancer load using SpiLoader (#2984) 113a915 is described below commit 113a91500839dd90dfe4ddc74776bd95cad69ec6 Author: kpavel <kpa...@il.ibm.com> AuthorDate: Tue Jan 23 02:48:11 2018 +0200 Dynamic LoadBalancer load using SpiLoader (#2984) * Add a container pool/invoker loadbalancer SPI. * Refactor invoker health tuple to proper type for cleaner SPI interface. Rename LoadBalancerService to ContainerPoolBalancer. Split into two files. --- common/scala/src/main/resources/reference.conf | 1 + .../scala/whisk/core/controller/Controller.scala | 16 +-- .../scala/whisk/core/controller/RestAPIs.scala | 14 +-- ...erService.scala => ContainerPoolBalancer.scala} | 124 ++++++++------------- .../core/loadBalancer/InvokerSupervision.scala | 10 +- .../whisk/core/loadBalancer/LoadBalancer.scala | 83 ++++++++++++++ .../controller/test/ControllerTestCommon.scala | 1 + ...cala => ContainerPoolBalancerObjectTests.scala} | 54 ++++----- .../test/InvokerSupervisionTests.scala | 3 +- 9 files changed, 180 insertions(+), 126 deletions(-) diff --git a/common/scala/src/main/resources/reference.conf b/common/scala/src/main/resources/reference.conf index 45543e5..4530aef 100644 --- a/common/scala/src/main/resources/reference.conf +++ b/common/scala/src/main/resources/reference.conf @@ -3,4 +3,5 @@ whisk.spi{ MessagingProvider = whisk.connector.kafka.KafkaMessagingProvider ContainerFactoryProvider = whisk.core.containerpool.docker.DockerContainerFactoryProvider LogStoreProvider = whisk.core.containerpool.logging.DockerToActivationLogStoreProvider + LoadBalancerProvider = whisk.core.loadBalancer.ContainerPoolBalancer } diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala index ba58452..53408c3 100644 --- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala +++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala @@ -46,7 +46,7 @@ import whisk.core.entitlement._ import whisk.core.entity._ import whisk.core.entity.ActivationId.ActivationIdGenerator import whisk.core.entity.ExecManifest.Runtimes -import whisk.core.loadBalancer.{LoadBalancerService} +import whisk.core.loadBalancer.LoadBalancerProvider import whisk.http.BasicHttpService import whisk.http.BasicRasService import whisk.spi.SpiLoader @@ -117,7 +117,8 @@ class Controller(val instance: InstanceId, }) // initialize backend services - private implicit val loadBalancer = new LoadBalancerService(whiskConfig, instance, entityStore) + private implicit val loadBalancer = + SpiLoader.get[LoadBalancerProvider].loadBalancer(whiskConfig, instance) private implicit val entitlementProvider = new LocalEntitlementProvider(whiskConfig, loadBalancer) private implicit val activationIdFactory = new ActivationIdGenerator {} private implicit val logStore = SpiLoader.get[LogStoreProvider].logStore(actorSystem) @@ -137,12 +138,13 @@ class Controller(val instance: InstanceId, */ private val internalInvokerHealth = { implicit val executionContext = actorSystem.dispatcher - (path("invokers") & get) { complete { - loadBalancer.allInvokers.map(_.map { - case (instance, state) => s"invoker${instance.toInt}" -> state.asString - }.toMap.toJson.asJsObject) + loadBalancer + .invokerHealth() + .map(_.map { + case i => s"invoker${i.id.toInt}" -> i.status.asString + }.toMap.toJson.asJsObject) } } } @@ -163,7 +165,7 @@ object Controller { Map(WhiskConfig.controllerInstances -> null) ++ ExecManifest.requiredProperties ++ RestApiCommons.requiredProperties ++ - LoadBalancerService.requiredProperties ++ + SpiLoader.get[LoadBalancerProvider].requiredProperties ++ EntitlementProvider.requiredProperties private def info(config: WhiskConfig, runtimes: Runtimes, apis: List[String]) = diff --git a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala index ffcf01b..161ef4b 100644 --- a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala +++ b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala @@ -46,7 +46,7 @@ import whisk.core.entity._ import whisk.core.entity.ActivationId.ActivationIdGenerator import whisk.core.entity.WhiskAuthStore import whisk.core.entity.types._ -import whisk.core.loadBalancer.LoadBalancerService +import whisk.core.loadBalancer.LoadBalancer import whisk.http.Messages /** @@ -161,7 +161,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)( implicit val entityStore: EntityStore, implicit val entitlementProvider: EntitlementProvider, implicit val activationIdFactory: ActivationIdGenerator, - implicit val loadBalancer: LoadBalancerService, + implicit val loadBalancer: LoadBalancer, implicit val cacheChangeNotification: Some[CacheChangeNotification], implicit val activationStore: ActivationStore, implicit val logStore: LogStore, @@ -243,7 +243,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)( override val activationStore: ActivationStore, override val entitlementProvider: EntitlementProvider, override val activationIdFactory: ActivationIdGenerator, - override val loadBalancer: LoadBalancerService, + override val loadBalancer: LoadBalancer, override val cacheChangeNotification: Some[CacheChangeNotification], override val executionContext: ExecutionContext, override val logging: Logging, @@ -266,7 +266,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)( implicit override val entityStore: EntityStore, override val entitlementProvider: EntitlementProvider, override val activationIdFactory: ActivationIdGenerator, - override val loadBalancer: LoadBalancerService, + override val loadBalancer: LoadBalancer, override val cacheChangeNotification: Some[CacheChangeNotification], override val executionContext: ExecutionContext, override val logging: Logging, @@ -279,7 +279,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)( override val entityStore: EntityStore, override val entitlementProvider: EntitlementProvider, override val activationIdFactory: ActivationIdGenerator, - override val loadBalancer: LoadBalancerService, + override val loadBalancer: LoadBalancer, override val cacheChangeNotification: Some[CacheChangeNotification], override val executionContext: ExecutionContext, override val logging: Logging, @@ -293,7 +293,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)( override val entitlementProvider: EntitlementProvider, override val activationStore: ActivationStore, override val activationIdFactory: ActivationIdGenerator, - override val loadBalancer: LoadBalancerService, + override val loadBalancer: LoadBalancer, override val cacheChangeNotification: Some[CacheChangeNotification], override val executionContext: ExecutionContext, override val logging: Logging, @@ -310,7 +310,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)( override val activationStore: ActivationStore, override val entitlementProvider: EntitlementProvider, override val activationIdFactory: ActivationIdGenerator, - override val loadBalancer: LoadBalancerService, + override val loadBalancer: LoadBalancer, override val actorSystem: ActorSystem, override val executionContext: ExecutionContext, override val logging: Logging, diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala similarity index 82% rename from core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala rename to core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala index 89ba900..ffd831d 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala @@ -19,82 +19,43 @@ package whisk.core.loadBalancer import java.nio.charset.StandardCharsets -import scala.annotation.tailrec -import scala.concurrent.Await -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import scala.concurrent.Promise -import scala.concurrent.duration._ -import scala.util.Failure -import scala.util.Success -import org.apache.kafka.clients.producer.RecordMetadata -import akka.actor.ActorRefFactory -import akka.actor.ActorSystem -import akka.actor.Props +import akka.actor.{ActorRefFactory, ActorSystem, Props} import akka.cluster.Cluster -import akka.util.Timeout import akka.pattern.ask -import whisk.common.Logging -import whisk.common.LoggingMarkers -import whisk.common.TransactionId -import whisk.core.ConfigKeys -import whisk.core.WhiskConfig +import akka.stream.ActorMaterializer +import akka.util.Timeout +import org.apache.kafka.clients.producer.RecordMetadata +import pureconfig._ +import whisk.common.{Logging, LoggingMarkers, TransactionId} import whisk.core.WhiskConfig._ -import whisk.core.connector.{ActivationMessage, CompletionMessage} -import whisk.core.connector.MessageFeed -import whisk.core.connector.MessageProducer -import whisk.core.connector.MessagingProvider +import whisk.core.connector._ import whisk.core.database.NoDocumentException import whisk.core.entity._ -import whisk.core.entity.{ActivationId, WhiskActivation} -import whisk.core.entity.EntityName -import whisk.core.entity.ExecutableWhiskActionMetaData -import whisk.core.entity.Identity -import whisk.core.entity.InstanceId -import whisk.core.entity.UUID -import whisk.core.entity.WhiskAction import whisk.core.entity.types.EntityStore +import whisk.core.{ConfigKeys, WhiskConfig} import whisk.spi.SpiLoader -import pureconfig._ -case class LoadbalancerConfig(blackboxFraction: Double, invokerBusyThreshold: Int) - -trait LoadBalancer { - - val activeAckTimeoutGrace = 1.minute - - /** Gets the number of in-flight activations for a specific user. */ - def activeActivationsFor(namespace: UUID): Future[Int] - - /** Gets the number of in-flight activations in the system. */ - def totalActiveActivations: Future[Int] - - /** - * Publishes activation message on internal bus for an invoker to pick up. - * - * @param action the action to invoke - * @param msg the activation message to publish on an invoker topic - * @param transid the transaction id for the request - * @return result a nested Future the outer indicating completion of publishing and - * the inner the completion of the action (i.e., the result) - * if it is ready before timeout (Right) otherwise the activation id (Left). - * The future is guaranteed to complete within the declared action time limit - * plus a grace period (see activeAckTimeoutGrace). - */ - def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)( - implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] +import scala.annotation.tailrec +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext, Future, Promise} +import scala.util.{Failure, Success} -} +case class LoadbalancerConfig(blackboxFraction: Double, invokerBusyThreshold: Int) -class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore: EntityStore)( - implicit val actorSystem: ActorSystem, - logging: Logging) +class ContainerPoolBalancer(config: WhiskConfig, instance: InstanceId)(implicit val actorSystem: ActorSystem, + logging: Logging, + materializer: ActorMaterializer) extends LoadBalancer { private val lbConfig = loadConfigOrThrow[LoadbalancerConfig](ConfigKeys.loadbalancer) + /** Used to manage an action for testing invoker health */ /** Used to manage an action for testing invoker health */ + private val entityStore = WhiskEntityStore.datastore(config) + /** The execution context for futures */ - implicit val executionContext: ExecutionContext = actorSystem.dispatcher + private implicit val executionContext: ExecutionContext = actorSystem.dispatcher + + private val activeAckTimeoutGrace = 1.minute /** How many invokers are dedicated to blackbox images. We range bound to something sensical regardless of configuration. */ private val blackboxFraction: Double = Math.max(0.0, Math.min(1.0, lbConfig.blackboxFraction)) @@ -113,10 +74,6 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore } } - override def activeActivationsFor(namespace: UUID) = loadBalancerData.activationCountOn(namespace) - - override def totalActiveActivations = loadBalancerData.totalActivationCount - override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)( implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = { chooseInvoker(msg.user, action).flatMap { invokerName => @@ -127,11 +84,16 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore } } - /** An indexed sequence of all invokers in the current system */ - def allInvokers: Future[IndexedSeq[(InstanceId, InvokerState)]] = + /** An indexed sequence of all invokers in the current system. */ + override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] = { invokerPool .ask(GetStatus)(Timeout(5.seconds)) - .mapTo[IndexedSeq[(InstanceId, InvokerState)]] + .mapTo[IndexedSeq[InvokerHealth]] + } + + override def activeActivationsFor(namespace: UUID) = loadBalancerData.activationCountOn(namespace) + + override def totalActiveActivations = loadBalancerData.totalActivationCount /** * Tries to fill in the result slot (i.e., complete the promise) when a completion message arrives. @@ -307,9 +269,8 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore /** Compute the number of blackbox-dedicated invokers by applying a rounded down fraction of all invokers (but at least 1). */ private def numBlackbox(totalInvokers: Int) = Math.max(1, (totalInvokers.toDouble * blackboxFraction).toInt) - /** Return invokers (almost) dedicated to running blackbox actions. */ - private def blackboxInvokers( - invokers: IndexedSeq[(InstanceId, InvokerState)]): IndexedSeq[(InstanceId, InvokerState)] = { + /** Return invokers dedicated to running blackbox actions. */ + private def blackboxInvokers(invokers: IndexedSeq[InvokerHealth]): IndexedSeq[InvokerHealth] = { val blackboxes = numBlackbox(invokers.size) invokers.takeRight(blackboxes) } @@ -318,8 +279,7 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore * Return (at least one) invokers for running non black-box actions. * This set can overlap with the blackbox set if there is only one invoker. */ - private def managedInvokers( - invokers: IndexedSeq[(InstanceId, InvokerState)]): IndexedSeq[(InstanceId, InvokerState)] = { + private def managedInvokers(invokers: IndexedSeq[InvokerHealth]): IndexedSeq[InvokerHealth] = { val managed = Math.max(1, invokers.length - numBlackbox(invokers.length)) invokers.take(managed) } @@ -329,14 +289,14 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore val hash = generateHash(user.namespace, action) loadBalancerData.activationCountPerInvoker.flatMap { currentActivations => - allInvokers.flatMap { invokers => + invokerHealth().flatMap { invokers => val invokersToUse = if (action.exec.pull) blackboxInvokers(invokers) else managedInvokers(invokers) val invokersWithUsage = invokersToUse.view.map { // Using a view defers the comparably expensive lookup to actual access of the element - case (instance, state) => (instance, state, currentActivations.getOrElse(instance.toString, 0)) + case invoker => (invoker.id, invoker.status, currentActivations.getOrElse(instance.toString, 0)) } - LoadBalancerService.schedule(invokersWithUsage, lbConfig.invokerBusyThreshold, hash) match { + ContainerPoolBalancer.schedule(invokersWithUsage, lbConfig.invokerBusyThreshold, hash) match { case Some(invoker) => Future.successful(invoker) case None => logging.error(this, s"all invokers down")(TransactionId.invokerHealth) @@ -352,7 +312,13 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore } } -object LoadBalancerService { +object ContainerPoolBalancer extends LoadBalancerProvider { + + override def loadBalancer(whiskConfig: WhiskConfig, instance: InstanceId)( + implicit actorSystem: ActorSystem, + logging: Logging, + materializer: ActorMaterializer): LoadBalancer = new ContainerPoolBalancer(whiskConfig, instance) + def requiredProperties = kafkaHosts ++ Map(controllerLocalBookkeeping -> null, controllerSeedNodes -> null) @@ -367,7 +333,7 @@ object LoadBalancerService { def gcd(a: Int, b: Int): Int = if (b == 0) a else gcd(b, a % b) /** Returns pairwise coprime numbers until x. Result is memoized. */ - val pairwiseCoprimeNumbersUntil: Int => IndexedSeq[Int] = LoadBalancerService.memoize { + val pairwiseCoprimeNumbersUntil: Int => IndexedSeq[Int] = ContainerPoolBalancer.memoize { case x => (1 to x).foldLeft(IndexedSeq.empty[Int])((primes, cur) => { if (gcd(cur, x) == 1 && primes.forall(i => gcd(i, cur) == 1)) { @@ -394,7 +360,7 @@ object LoadBalancerService { val numInvokers = invokers.size if (numInvokers > 0) { val homeInvoker = hash % numInvokers - val stepSizes = LoadBalancerService.pairwiseCoprimeNumbersUntil(numInvokers) + val stepSizes = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(numInvokers) val step = stepSizes(hash % stepSizes.size) val invokerProgression = Stream diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala index cd80a8e..13c3a70 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala @@ -86,7 +86,7 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef, // from leaking the state for external mutation var instanceToRef = immutable.Map.empty[InstanceId, ActorRef] var refToInstance = immutable.Map.empty[ActorRef, InstanceId] - var status = IndexedSeq[(InstanceId, InvokerState)]() + var status = IndexedSeq[InvokerHealth]() def receive = { case p: PingMessage => @@ -103,13 +103,13 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef, case CurrentState(invoker, currentState: InvokerState) => refToInstance.get(invoker).foreach { instance => - status = status.updated(instance.toInt, (instance, currentState)) + status = status.updated(instance.toInt, new InvokerHealth(instance, currentState)) } logStatus() case Transition(invoker, oldState: InvokerState, newState: InvokerState) => refToInstance.get(invoker).foreach { instance => - status = status.updated(instance.toInt, (instance, newState)) + status = status.updated(instance.toInt, new InvokerHealth(instance, newState)) } logStatus() @@ -118,7 +118,7 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef, } def logStatus() = { - val pretty = status.map { case (instance, state) => s"${instance.toInt} -> $state" } + val pretty = status.map(i => s"${i.id.toInt} -> ${i.status}") logging.info(this, s"invoker status changed to ${pretty.mkString(", ")}") } @@ -155,7 +155,7 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef, def registerInvoker(instanceId: InstanceId): ActorRef = { logging.info(this, s"registered a new invoker: invoker${instanceId.toInt}")(TransactionId.invokerHealth) - status = padToIndexed(status, instanceId.toInt + 1, i => (InstanceId(i), Offline)) + status = padToIndexed(status, instanceId.toInt + 1, i => new InvokerHealth(InstanceId(i), Offline)) val ref = childFactory(context, instanceId) diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala new file mode 100644 index 0000000..8f2227f --- /dev/null +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.loadBalancer + +import scala.concurrent.Future +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import whisk.common.{Logging, TransactionId} +import whisk.core.WhiskConfig +import whisk.core.connector._ +import whisk.core.entity._ +import whisk.spi.Spi + +/** + * Describes an abstract invoker. An invoker is a local container pool manager that + * is in charge of the container life cycle management. + * + * @param id a unique instance identifier for the invoker + * @param status it status (healthy, unhealthy, offline) + */ +class InvokerHealth(val id: InstanceId, val status: InvokerState) { + override def equals(obj: scala.Any): Boolean = obj match { + case that: InvokerHealth => that.id == this.id && that.status == this.status + case _ => false + } +} + +trait LoadBalancer { + + /** + * Publishes activation message on internal bus for an invoker to pick up. + * + * @param action the action to invoke + * @param msg the activation message to publish on an invoker topic + * @param transid the transaction id for the request + * @return result a nested Future the outer indicating completion of publishing and + * the inner the completion of the action (i.e., the result) + * if it is ready before timeout (Right) otherwise the activation id (Left). + * The future is guaranteed to complete within the declared action time limit + * plus a grace period (see activeAckTimeoutGrace). + */ + def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)( + implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] + + /** + * Returns a message indicating the health of the containers and/or container pool in general. + * + * @return a Future[IndexedSeq[InvokerHealth]] representing the health of the pools managed by the loadbalancer. + */ + def invokerHealth(): Future[IndexedSeq[InvokerHealth]] + + /** Gets the number of in-flight activations for a specific user. */ + def activeActivationsFor(namespace: UUID): Future[Int] + + /** Gets the number of in-flight activations in the system. */ + def totalActiveActivations: Future[Int] +} + +/** + * An Spi for providing load balancer implementations. + */ +trait LoadBalancerProvider extends Spi { + def requiredProperties: Map[String, String] + + def loadBalancer(whiskConfig: WhiskConfig, instance: InstanceId)(implicit actorSystem: ActorSystem, + logging: Logging, + materializer: ActorMaterializer): LoadBalancer +} diff --git a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala index f0c9fe5..7344ba7 100644 --- a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala +++ b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala @@ -200,4 +200,5 @@ class DegenerateLoadBalancerService(config: WhiskConfig)(implicit ec: ExecutionC } getOrElse Future.failed(new IllegalArgumentException("Unit test does not need fast path")) } + override def invokerHealth() = Future.successful(IndexedSeq.empty) } diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerServiceObjectTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/ContainerPoolBalancerObjectTests.scala similarity index 68% rename from tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerServiceObjectTests.scala rename to tests/src/test/scala/whisk/core/loadBalancer/test/ContainerPoolBalancerObjectTests.scala index 3f3dca4..60eda84 100644 --- a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerServiceObjectTests.scala +++ b/tests/src/test/scala/whisk/core/loadBalancer/test/ContainerPoolBalancerObjectTests.scala @@ -21,7 +21,7 @@ import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.FlatSpec import org.scalatest.Matchers -import whisk.core.loadBalancer.LoadBalancerService +import whisk.core.loadBalancer.ContainerPoolBalancer import whisk.core.loadBalancer.Healthy import whisk.core.loadBalancer.Offline import whisk.core.loadBalancer.UnHealthy @@ -34,12 +34,12 @@ import whisk.core.entity.InstanceId * of the ContainerPool object. */ @RunWith(classOf[JUnitRunner]) -class LoadBalancerServiceObjectTests extends FlatSpec with Matchers { +class ContainerPoolBalancerObjectTests extends FlatSpec with Matchers { behavior of "memoize" it should "not recompute a value which was already given" in { var calls = 0 - val add1: Int => Int = LoadBalancerService.memoize { + val add1: Int => Int = ContainerPoolBalancer.memoize { case second => calls += 1 1 + second @@ -58,18 +58,18 @@ class LoadBalancerServiceObjectTests extends FlatSpec with Matchers { behavior of "pairwiseCoprimeNumbersUntil" it should "return an empty set for malformed inputs" in { - LoadBalancerService.pairwiseCoprimeNumbersUntil(0) shouldBe Seq() - LoadBalancerService.pairwiseCoprimeNumbersUntil(-1) shouldBe Seq() + ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0) shouldBe Seq() + ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(-1) shouldBe Seq() } it should "return all coprime numbers until the number given" in { - LoadBalancerService.pairwiseCoprimeNumbersUntil(1) shouldBe Seq(1) - LoadBalancerService.pairwiseCoprimeNumbersUntil(2) shouldBe Seq(1) - LoadBalancerService.pairwiseCoprimeNumbersUntil(3) shouldBe Seq(1, 2) - LoadBalancerService.pairwiseCoprimeNumbersUntil(4) shouldBe Seq(1, 3) - LoadBalancerService.pairwiseCoprimeNumbersUntil(5) shouldBe Seq(1, 2, 3) - LoadBalancerService.pairwiseCoprimeNumbersUntil(9) shouldBe Seq(1, 2, 5, 7) - LoadBalancerService.pairwiseCoprimeNumbersUntil(10) shouldBe Seq(1, 3, 7) + ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(1) shouldBe Seq(1) + ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(2) shouldBe Seq(1) + ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(3) shouldBe Seq(1, 2) + ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(4) shouldBe Seq(1, 3) + ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(5) shouldBe Seq(1, 2, 3) + ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(9) shouldBe Seq(1, 2, 5, 7) + ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(10) shouldBe Seq(1, 3, 7) } behavior of "chooseInvoker" @@ -78,24 +78,24 @@ class LoadBalancerServiceObjectTests extends FlatSpec with Matchers { def hashInto[A](list: Seq[A], hash: Int) = list(hash % list.size) it should "return None on an empty invokers list" in { - LoadBalancerService.schedule(IndexedSeq(), 0, 1) shouldBe None + ContainerPoolBalancer.schedule(IndexedSeq(), 0, 1) shouldBe None } it should "return None on a list of offline/unhealthy invokers" in { val invs = IndexedSeq((InstanceId(0), Offline, 0), (InstanceId(1), UnHealthy, 0)) - LoadBalancerService.schedule(invs, 0, 1) shouldBe None + ContainerPoolBalancer.schedule(invs, 0, 1) shouldBe None } it should "schedule to the home invoker" in { val invs = invokers(10) val hash = 2 - LoadBalancerService.schedule(invs, 1, hash) shouldBe Some(InstanceId(hash % invs.size)) + ContainerPoolBalancer.schedule(invs, 1, hash) shouldBe Some(InstanceId(hash % invs.size)) } it should "take the only online invoker" in { - LoadBalancerService.schedule( + ContainerPoolBalancer.schedule( IndexedSeq((InstanceId(0), Offline, 0), (InstanceId(1), UnHealthy, 0), (InstanceId(2), Healthy, 0)), 0, 1) shouldBe Some(InstanceId(2)) @@ -105,7 +105,7 @@ class LoadBalancerServiceObjectTests extends FlatSpec with Matchers { val hash = 0 val invs = IndexedSeq((InstanceId(0), Healthy, 10), (InstanceId(1), UnHealthy, 0), (InstanceId(2), Healthy, 0)) - LoadBalancerService.schedule(invs, 10, hash) shouldBe Some(InstanceId(2)) + ContainerPoolBalancer.schedule(invs, 10, hash) shouldBe Some(InstanceId(2)) } it should "jump to the next invoker determined by a hashed stepsize if the home invoker is overloaded" in { @@ -114,9 +114,9 @@ class LoadBalancerServiceObjectTests extends FlatSpec with Matchers { val targetInvoker = hash % invokerCount val invs = invokers(invokerCount).updated(targetInvoker, (InstanceId(targetInvoker), Healthy, 1)) - val step = hashInto(LoadBalancerService.pairwiseCoprimeNumbersUntil(invokerCount), hash) + val step = hashInto(ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(invokerCount), hash) - LoadBalancerService.schedule(invs, 1, hash) shouldBe Some(InstanceId((hash + step) % invs.size)) + ContainerPoolBalancer.schedule(invs, 1, hash) shouldBe Some(InstanceId((hash + step) % invs.size)) } it should "wrap the search at the end of the invoker list" in { @@ -125,12 +125,12 @@ class LoadBalancerServiceObjectTests extends FlatSpec with Matchers { val hash = 1 val targetInvoker = hashInto(invs, hash) // will be invoker1 - val step = hashInto(LoadBalancerService.pairwiseCoprimeNumbersUntil(invokerCount), hash) // will be 2 + val step = hashInto(ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(invokerCount), hash) // will be 2 step shouldBe 2 // invoker1 is overloaded so it will step (2 steps) to the next one --> 1 2 0 --> invoker0 is next target // invoker0 is overloaded so it will step to the next one --> 0 1 2 --> invoker2 is next target and underloaded - LoadBalancerService.schedule(invs, 1, hash) shouldBe Some(InstanceId((hash + step + step) % invs.size)) + ContainerPoolBalancer.schedule(invs, 1, hash) shouldBe Some(InstanceId((hash + step + step) % invs.size)) } it should "multiply its threshold in 3 iterations to find an invoker with a good warm-chance" in { @@ -140,22 +140,22 @@ class LoadBalancerServiceObjectTests extends FlatSpec with Matchers { // even though invoker1 is not the home invoker in this case, it gets chosen over // the others because it's the first one encountered by the iteration mechanism to be below // the threshold of 3 * 16 invocations - LoadBalancerService.schedule(invs, 16, hash) shouldBe Some(InstanceId(0)) + ContainerPoolBalancer.schedule(invs, 16, hash) shouldBe Some(InstanceId(0)) } it should "choose the home invoker if all invokers are overloaded even above the muliplied threshold" in { val invs = IndexedSeq((InstanceId(0), Healthy, 51), (InstanceId(1), Healthy, 50), (InstanceId(2), Healthy, 49)) val hash = 0 // home is 0, stepsize is 1 - LoadBalancerService.schedule(invs, 16, hash) shouldBe Some(InstanceId(0)) + ContainerPoolBalancer.schedule(invs, 16, hash) shouldBe Some(InstanceId(0)) } it should "transparently work with partitioned sets of invokers" in { val invs = IndexedSeq((InstanceId(3), Healthy, 0), (InstanceId(4), Healthy, 0), (InstanceId(5), Healthy, 0)) - LoadBalancerService.schedule(invs, 1, 0) shouldBe Some(InstanceId(3)) - LoadBalancerService.schedule(invs, 1, 1) shouldBe Some(InstanceId(4)) - LoadBalancerService.schedule(invs, 1, 2) shouldBe Some(InstanceId(5)) - LoadBalancerService.schedule(invs, 1, 3) shouldBe Some(InstanceId(3)) + ContainerPoolBalancer.schedule(invs, 1, 0) shouldBe Some(InstanceId(3)) + ContainerPoolBalancer.schedule(invs, 1, 1) shouldBe Some(InstanceId(4)) + ContainerPoolBalancer.schedule(invs, 1, 2) shouldBe Some(InstanceId(5)) + ContainerPoolBalancer.schedule(invs, 1, 3) shouldBe Some(InstanceId(3)) } } diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala index 8e7d291..6f0d05c 100644 --- a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala +++ b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala @@ -68,6 +68,7 @@ import whisk.core.loadBalancer.InvokerPool import whisk.core.loadBalancer.InvokerState import whisk.core.loadBalancer.Offline import whisk.core.loadBalancer.UnHealthy +import whisk.core.loadBalancer.InvokerHealth import whisk.utils.retry import whisk.core.connector.test.TestConnector import whisk.core.entitlement.Privilege @@ -101,7 +102,7 @@ class InvokerSupervisionTests /** Helper to generate a list of (InstanceId, InvokerState) */ def zipWithInstance(list: IndexedSeq[InvokerState]) = list.zipWithIndex.map { - case (state, index) => (InstanceId(index), state) + case (state, index) => new InvokerHealth(InstanceId(index), state) } val pC = new TestConnector("pingFeedTtest", 4, false) {} -- To stop receiving notification emails like this one, please contact tysonnor...@apache.org.