markusthoemmes closed pull request #2932: refactoring loadbalacing service with an overflow queue URL: https://github.com/apache/incubator-openwhisk/pull/2932
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/ansible/environments/docker-machine/group_vars/all b/ansible/environments/docker-machine/group_vars/all index 20f7228c94..21c5e75684 100644 --- a/ansible/environments/docker-machine/group_vars/all +++ b/ansible/environments/docker-machine/group_vars/all @@ -29,3 +29,9 @@ controller_arguments: '-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxre invoker_arguments: "{{ controller_arguments }}" invoker_allow_multiple_instances: true + + +limit_invocations_per_minute: 60000 +limit_invocations_concurrent: 30000 +limit_invocations_concurrent_system: 50000 +limit_fires_per_minute: 60 \ No newline at end of file diff --git a/ansible/environments/local/group_vars/all b/ansible/environments/local/group_vars/all index d99d85eb23..8dbbe14082 100755 --- a/ansible/environments/local/group_vars/all +++ b/ansible/environments/local/group_vars/all @@ -23,3 +23,8 @@ controller_arguments: '-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxre invoker_arguments: "{{ controller_arguments }}" invoker_allow_multiple_instances: true + +limit_invocations_per_minute: 60000 +limit_invocations_concurrent: 30000 +limit_invocations_concurrent_system: 50000 +limit_fires_per_minute: 60 \ No newline at end of file diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala b/common/scala/src/main/scala/whisk/core/connector/Message.scala index 3059f6b051..742397f53d 100644 --- a/common/scala/src/main/scala/whisk/core/connector/Message.scala +++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala @@ -18,7 +18,6 @@ package whisk.core.connector import scala.util.Try - import spray.json._ import whisk.common.TransactionId import whisk.core.entity.ActivationId @@ -118,3 +117,28 @@ object PingMessage extends DefaultJsonProtocol { def parse(msg: String) = Try(serdes.read(msg.parseJson)) implicit val serdes = jsonFormat(PingMessage.apply _, "name") } + +case class OverflowMessage(override val transid: TransactionId, + msg: ActivationMessage, + actionTimeoutSeconds: Int, + hash: Int, + pull: Boolean, + originalController: InstanceId) + extends Message { +// def meta = +// JsObject("meta" -> { +// cause map { c => +// JsObject(c.toJsObject.fields ++ msg.toJsObject.fields) +// } getOrElse { +// activationId.toJsObject +// } +// }) + override def serialize: String = { + OverflowMessage.serdes.write(this).compactPrint + } +} + +object OverflowMessage extends DefaultJsonProtocol { + def parse(msg: String): Try[OverflowMessage] = Try(serdes.read(msg.parseJson)) + implicit val serdes = jsonFormat6(OverflowMessage.apply) +} diff --git a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala index 31fcea09ba..ff1208bfb6 100644 --- a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala +++ b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala @@ -17,15 +17,14 @@ package whisk.core.connector +import akka.actor.ActorRef import scala.annotation.tailrec import scala.collection.mutable import scala.concurrent.Future import scala.concurrent.blocking import scala.concurrent.duration._ import scala.util.Failure - import org.apache.kafka.clients.consumer.CommitFailedException - import akka.actor.FSM import akka.pattern.pipe import whisk.common.Logging @@ -72,6 +71,9 @@ object MessageFeed { /** Steady state message, indicates capacity in downstream process to receive more messages. */ object Processed + /** message to indicate max offset is reached */ + object MaxOffset + /** Indicates the fill operation has completed. */ private case class FillCompleted(messages: Seq[(String, Int, Long, Array[Byte])]) } @@ -99,7 +101,8 @@ class MessageFeed(description: String, longPollDuration: FiniteDuration, handler: Array[Byte] => Future[Unit], autoStart: Boolean = true, - logHandoff: Boolean = true) + logHandoff: Boolean = true, + offsetMonitor: Option[ActorRef] = None) extends FSM[MessageFeed.FeedState, MessageFeed.FeedData] { import MessageFeed._ @@ -181,6 +184,10 @@ class MessageFeed(description: String, // of the commit should be masked. val records = consumer.peek(longPollDuration) consumer.commit() + if (records.size < maxPipelineDepth) { + //reached the max offset + offsetMonitor.foreach(_ ! MaxOffset) + } FillCompleted(records.toSeq) } }.andThen { diff --git a/common/scala/src/main/scala/whisk/spi/SpiLoader.scala b/common/scala/src/main/scala/whisk/spi/SpiLoader.scala index 6aa0f6e884..44592a445a 100644 --- a/common/scala/src/main/scala/whisk/spi/SpiLoader.scala +++ b/common/scala/src/main/scala/whisk/spi/SpiLoader.scala @@ -44,7 +44,8 @@ object SpiLoader { /** Lookup the classname for the SPI impl based on a key in the provided Config */ object TypesafeConfigClassResolver extends SpiClassResolver { - private val config = ConfigFactory.load() + //allow tests to inject a config + var config = ConfigFactory.load() override def getClassNameForType[T: Manifest]: String = config.getString("whisk.spi." + manifest[T].runtimeClass.getSimpleName) diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala index bb723bc627..285716d81c 100644 --- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala +++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala @@ -23,6 +23,7 @@ import scala.concurrent.Future import scala.util.{Failure, Success} import akka.Done import akka.actor.ActorSystem +import akka.cluster.Cluster import akka.actor.CoordinatedShutdown import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import akka.http.scaladsl.model.Uri @@ -39,13 +40,18 @@ import whisk.common.Logging import whisk.common.LoggingMarkers import whisk.common.TransactionId import whisk.core.WhiskConfig +import whisk.core.connector.MessagingProvider import whisk.core.database.RemoteCacheInvalidation import whisk.core.database.CacheChangeNotification import whisk.core.entitlement._ import whisk.core.entity._ import whisk.core.entity.ActivationId.ActivationIdGenerator import whisk.core.entity.ExecManifest.Runtimes -import whisk.core.loadBalancer.{LoadBalancerService} +import whisk.core.loadBalancer.DistributedLoadBalancerData +import whisk.core.loadBalancer.LoadBalancerActorService +import whisk.core.loadBalancer.LoadBalancerService +import whisk.core.loadBalancer.LocalLoadBalancerData +import whisk.core.loadBalancer.StaticSeedNodesProvider import whisk.http.BasicHttpService import whisk.http.BasicRasService import whisk.spi.SpiLoader @@ -114,7 +120,30 @@ class Controller(val instance: InstanceId, }) // initialize backend services - private implicit val loadBalancer = new LoadBalancerService(whiskConfig, instance, entityStore) + + /** Feature switch for shared load balancer data **/ + private val loadBalancerData = { + if (whiskConfig.controllerLocalBookkeeping) { + new LocalLoadBalancerData() + } else { + + /** Specify how seed nodes are generated */ + val seedNodesProvider = new StaticSeedNodesProvider(whiskConfig.controllerSeedNodes, actorSystem.name) + Cluster(actorSystem).joinSeedNodes(seedNodesProvider.getSeedNodes()) + new DistributedLoadBalancerData(instance) + } + } + private val messagingProvider = SpiLoader.get[MessagingProvider] + val maxPingsPerPoll = 128 + val pingConsumer = + messagingProvider.getConsumer(whiskConfig, s"health${instance.toInt}", "health", maxPeek = maxPingsPerPoll) + private val messageProducer = messagingProvider.getProducer(whiskConfig, actorSystem.dispatcher) + private implicit val loadBalancer = new LoadBalancerActorService( + whiskConfig, + instance, + LoadBalancerService + .createInvokerPool(instance, actorSystem, actorSystem.dispatcher, entityStore, messageProducer, pingConsumer), + loadBalancerData) private implicit val entitlementProvider = new LocalEntitlementProvider(whiskConfig, loadBalancer) private implicit val activationIdFactory = new ActivationIdGenerator {} private implicit val logStore = SpiLoader.get[LogStoreProvider].logStore(actorSystem) diff --git a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala index 8015f60d91..84e5c5517a 100644 --- a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala +++ b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala @@ -18,7 +18,6 @@ package whisk.core.controller import scala.concurrent.ExecutionContext - import akka.actor.ActorSystem import akka.http.scaladsl.model.StatusCodes._ import akka.http.scaladsl.model.Uri @@ -27,7 +26,6 @@ import akka.http.scaladsl.server.Route import akka.http.scaladsl.model.headers._ import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import akka.stream.ActorMaterializer - import spray.json._ import spray.json.DefaultJsonProtocol._ import whisk.core.database.CacheChangeNotification @@ -42,7 +40,7 @@ import whisk.core.entity._ import whisk.core.entity.ActivationId.ActivationIdGenerator import whisk.core.entity.WhiskAuthStore import whisk.core.entity.types._ -import whisk.core.loadBalancer.LoadBalancerService +import whisk.core.loadBalancer.LoadBalancer /** * Abstract class which provides basic Directives which are used to construct route structures @@ -140,7 +138,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)( implicit val entityStore: EntityStore, implicit val entitlementProvider: EntitlementProvider, implicit val activationIdFactory: ActivationIdGenerator, - implicit val loadBalancer: LoadBalancerService, + implicit val loadBalancer: LoadBalancer, implicit val cacheChangeNotification: Some[CacheChangeNotification], implicit val activationStore: ActivationStore, implicit val logStore: LogStore, @@ -222,7 +220,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)( override val activationStore: ActivationStore, override val entitlementProvider: EntitlementProvider, override val activationIdFactory: ActivationIdGenerator, - override val loadBalancer: LoadBalancerService, + override val loadBalancer: LoadBalancer, override val cacheChangeNotification: Some[CacheChangeNotification], override val executionContext: ExecutionContext, override val logging: Logging, @@ -245,7 +243,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)( implicit override val entityStore: EntityStore, override val entitlementProvider: EntitlementProvider, override val activationIdFactory: ActivationIdGenerator, - override val loadBalancer: LoadBalancerService, + override val loadBalancer: LoadBalancer, override val cacheChangeNotification: Some[CacheChangeNotification], override val executionContext: ExecutionContext, override val logging: Logging, @@ -258,7 +256,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)( override val entityStore: EntityStore, override val entitlementProvider: EntitlementProvider, override val activationIdFactory: ActivationIdGenerator, - override val loadBalancer: LoadBalancerService, + override val loadBalancer: LoadBalancer, override val cacheChangeNotification: Some[CacheChangeNotification], override val executionContext: ExecutionContext, override val logging: Logging, @@ -272,7 +270,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)( override val entitlementProvider: EntitlementProvider, override val activationStore: ActivationStore, override val activationIdFactory: ActivationIdGenerator, - override val loadBalancer: LoadBalancerService, + override val loadBalancer: LoadBalancer, override val cacheChangeNotification: Some[CacheChangeNotification], override val executionContext: ExecutionContext, override val logging: Logging, @@ -289,7 +287,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)( override val activationStore: ActivationStore, override val entitlementProvider: EntitlementProvider, override val activationIdFactory: ActivationIdGenerator, - override val loadBalancer: LoadBalancerService, + override val loadBalancer: LoadBalancer, override val actorSystem: ActorSystem, override val executionContext: ExecutionContext, override val logging: Logging, diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala index 34b5d6708f..e2257eed87 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala @@ -17,15 +17,17 @@ package whisk.core.loadBalancer +import akka.actor.Actor +import akka.actor.ActorRef import akka.actor.ActorSystem +import akka.actor.Props import akka.util.Timeout -import akka.pattern.ask -import whisk.common.Logging -import whisk.core.entity.{ActivationId, UUID} - -import scala.collection.concurrent.TrieMap -import scala.concurrent.Future +import scala.collection.mutable import scala.concurrent.duration._ +import whisk.common.Logging +import whisk.core.entity.ActivationId +import whisk.core.entity.InstanceId +import whisk.core.entity.UUID /** * Encapsulates data used for loadbalancer and active-ack bookkeeping. @@ -33,53 +35,99 @@ import scala.concurrent.duration._ * Note: The state keeping is backed by distributed akka actors. All CRUDs operations are done on local values, thus * a stale value might be read. */ -class DistributedLoadBalancerData(implicit actorSystem: ActorSystem, logging: Logging) extends LoadBalancerData { +class DistributedLoadBalancerData(instance: InstanceId, monitor: Option[ActorRef] = None)( + implicit actorSystem: ActorSystem, + logging: Logging) + extends LoadBalancerData { implicit val timeout = Timeout(5.seconds) implicit val executionContext = actorSystem.dispatcher - private val activationsById = TrieMap[ActivationId, ActivationEntry]() + private val overflowKey = "overflow" + private val activationsById = mutable.Map[ActivationId, ActivationEntry]() + + private val localData = new LocalLoadBalancerData() + private var sharedDataInvokers = Map[String, Map[Int, Int]]() + private var sharedDataNamespaces = Map[String, Map[Int, Int]]() + private var sharedDataOverflow = Map[String, Map[Int, Int]]() + + private val updateMonitor = actorSystem.actorOf(Props(new Actor { + override def receive = { + case Updated(storageName, entries) => + monitor.foreach(_ ! Updated(storageName, entries)) + storageName match { + case "Invokers" => sharedDataInvokers = entries + case "Namespaces" => sharedDataNamespaces = entries + case "Overflow" => sharedDataOverflow = entries + } + } + })) private val sharedStateInvokers = actorSystem.actorOf( - SharedDataService.props("Invokers"), + SharedDataService.props("Invokers", updateMonitor), name = "SharedDataServiceInvokers" + UUID()) private val sharedStateNamespaces = actorSystem.actorOf( - SharedDataService.props("Namespaces"), + SharedDataService.props("Namespaces", updateMonitor), name = "SharedDataServiceNamespaces" + UUID()) - - def totalActivationCount = - (sharedStateInvokers ? GetMap).mapTo[Map[String, BigInt]].map(_.values.sum.toInt) - - def activationCountOn(namespace: UUID): Future[Int] = { - (sharedStateNamespaces ? GetMap) - .mapTo[Map[String, BigInt]] - .map(_.mapValues(_.toInt).getOrElse(namespace.toString, 0)) + private val sharedStateOverflow = actorSystem.actorOf( + SharedDataService.props("Overflow", updateMonitor), + name = + "SharedDataServiceOverflow" + UUID()) + def totalActivationCount = { + val shared = sharedDataInvokers.values.flatten.filter(_._1 != instance.toInt).map(_._2).sum + shared + localData.totalActivationCount + } + def activationCountOn(namespace: UUID): Int = { + val shared = sharedDataNamespaces.getOrElse(namespace.toString, Map()).filter(_._1 != instance.toInt).values.sum + shared + localData.activationCountOn(namespace) } - def activationCountPerInvoker: Future[Map[String, Int]] = { - (sharedStateInvokers ? GetMap).mapTo[Map[String, BigInt]].map(_.mapValues(_.toInt)) + def activationCountPerInvoker: Map[String, Int] = { + val shared = sharedDataInvokers.mapValues(_.filter(_._1 != instance.toInt).values.sum) + val local = localData.activationCountPerInvoker + local ++ shared.map { case (k, v) => k -> (v + local.getOrElse(k, 0)) } } def activationById(activationId: ActivationId): Option[ActivationEntry] = { - activationsById.get(activationId) + localData.activationById(activationId) + //NOTE: activations are NOT replicated, only the counters } - def putActivation(id: ActivationId, update: => ActivationEntry): ActivationEntry = { + def putActivation(id: ActivationId, update: => ActivationEntry, isOverflow: Boolean = false): ActivationEntry = { activationsById.getOrElseUpdate(id, { val entry = update - sharedStateNamespaces ! IncreaseCounter(entry.namespaceId.asString, 1) - sharedStateInvokers ! IncreaseCounter(entry.invokerName.toString, 1) + //update the shared stats + //if we are processing an overflow message, do NOT double count against namespace + if (!isOverflow) { + sharedStateNamespaces ! IncreaseCounter(entry.namespaceId.asString, instance, 1) + } + //if the initial processing msg is routed to invoker, increase the invoker counter + //otherwise increase the overflow counter + entry.invokerName match { + case Some(i) => sharedStateInvokers ! IncreaseCounter(i.toString, instance, 1) + case None => sharedStateOverflow ! IncreaseCounter(overflowKey, instance, 1) + } logging.debug(this, "increased shared counters") + //store the activation + localData.putActivation(id, entry) entry }) } def removeActivation(entry: ActivationEntry): Option[ActivationEntry] = { activationsById.remove(entry.id).map { activationEntry => - sharedStateInvokers ! DecreaseCounter(entry.invokerName.toString, 1) - sharedStateNamespaces ! DecreaseCounter(entry.namespaceId.asString, 1) - logging.debug(this, "decreased shared counters") + if (!activationEntry.isOverflow) { + //update the shared stats + sharedStateNamespaces ! DecreaseCounter(entry.namespaceId.asString, instance, 1) + } + entry.invokerName match { + case Some(i) => sharedStateInvokers ! DecreaseCounter(i.toString, instance, 1) + case None => sharedStateOverflow ! DecreaseCounter(overflowKey, instance, 1) + } + logging.debug(this, s"decreased shared counters ") + //remove the activation + localData.removeActivation(entry) activationEntry } } @@ -87,4 +135,13 @@ class DistributedLoadBalancerData(implicit actorSystem: ActorSystem, logging: Lo def removeActivation(aid: ActivationId): Option[ActivationEntry] = { activationsById.get(aid).flatMap(removeActivation) } + + /** + * Get the number of activations waiting at the overflow queue + * @return + */ + override def overflowActivationCount = { + val shared = sharedDataOverflow.values.flatten.filter(_._1 != instance.toInt).map(_._2).sum + shared + localData.overflowActivationCount + } } diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala index 13517de5f3..564f74ca6f 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala @@ -18,7 +18,6 @@ package whisk.core.loadBalancer import java.nio.charset.StandardCharsets - import scala.collection.mutable import scala.concurrent.Future import scala.concurrent.duration._ @@ -34,19 +33,25 @@ import akka.actor.FSM.SubscribeTransitionCallBack import akka.actor.FSM.Transition import akka.actor.Props import akka.pattern.pipe +//import akka.pattern.ask import akka.util.Timeout +import scala.concurrent.ExecutionContext import whisk.common.AkkaLogging +import whisk.common.Logging import whisk.common.LoggingMarkers import whisk.common.RingBuffer import whisk.common.TransactionId import whisk.core.connector._ +import whisk.core.database.NoDocumentException import whisk.core.entitlement.Privilege import whisk.core.entity.ActivationId.ActivationIdGenerator import whisk.core.entity._ +import whisk.core.entity.types.EntityStore // Received events case object GetStatus - +case class SubscribeLoadBalancer(loadBalancerActor: ActorRef) +case class StatusUpdate(status: IndexedSeq[(InstanceId, InvokerState)]) case object Tick // States an Invoker can be in @@ -87,6 +92,7 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef, val instanceToRef = mutable.Map[InstanceId, ActorRef]() val refToInstance = mutable.Map[ActorRef, InstanceId]() var status = IndexedSeq[(InstanceId, InvokerState)]() + var lbActor: Option[ActorRef] = None def receive = { case p: PingMessage => @@ -94,7 +100,7 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef, logging.info(this, s"registered a new invoker: invoker${p.instance.toInt}")(TransactionId.invokerHealth) status = padToIndexed(status, p.instance.toInt + 1, i => (InstanceId(i), Offline)) - + lbActor.foreach(_ ! StatusUpdate(status)) val ref = childFactory(context, p.instance) ref ! SubscribeTransitionCallBack(self) // register for state change events @@ -105,6 +111,10 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef, case GetStatus => sender() ! status + case SubscribeLoadBalancer(lb) => + lbActor = Some(lb) + lb ! StatusUpdate(status) + case msg: InvocationFinishedMessage => { // Forward message to invoker, if InvokerActor exists instanceToRef.get(msg.invokerInstance).map(_.forward(msg)) @@ -113,12 +123,14 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef, case CurrentState(invoker, currentState: InvokerState) => refToInstance.get(invoker).foreach { instance => status = status.updated(instance.toInt, (instance, currentState)) + lbActor.foreach(_ ! StatusUpdate(status)) } logStatus() case Transition(invoker, oldState: InvokerState, newState: InvokerState) => refToInstance.get(invoker).foreach { instance => status = status.updated(instance.toInt, (instance, newState)) + lbActor.foreach(_ ! StatusUpdate(status)) } logStatus() @@ -181,6 +193,29 @@ object InvokerPool { name = EntityName(s"invokerHealthTestAction${i.toInt}"), exec = new CodeExecAsString(manifest, """function main(params) { return params; }""", None)) } + + /** + * Creates or updates a health test action by updating the entity store. + * This method is intended for use on startup. + * @return Future that completes successfully iff the action is added to the database + */ + def createTestActionForInvokerHealth(db: EntityStore, action: WhiskAction)(implicit logging: Logging, + ec: ExecutionContext): Future[Unit] = { + implicit val tid = TransactionId.loadbalancer + WhiskAction + .get(db, action.docid) + .flatMap { oldAction => + WhiskAction.put(db, action.revision(oldAction.rev))(tid, notifier = None) + } + .recover { + case _: NoDocumentException => WhiskAction.put(db, action)(tid, notifier = None) + } + .map(_ => {}) + .andThen { + case Success(_) => logging.info(this, "test action for invoker health now exists") + case Failure(e) => logging.error(this, s"error creating test action for invoker health: $e") + } + } } /** diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala index 1866d2dcba..0972082d69 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala @@ -17,18 +17,20 @@ package whisk.core.loadBalancer -import whisk.core.entity.{ActivationId, InstanceId, UUID, WhiskActivation} - -import scala.concurrent.{Future, Promise} +import whisk.core.entity.{ActivationId, UUID, WhiskActivation} +import scala.concurrent.{Promise} +import whisk.core.entity.InstanceId case class ActivationEntry(id: ActivationId, namespaceId: UUID, - invokerName: InstanceId, - promise: Promise[Either[ActivationId, WhiskActivation]]) + var invokerName: Option[InstanceId], + promise: Promise[Either[ActivationId, WhiskActivation]], + originalController: Option[InstanceId] = None, + isOverflow: Boolean = false) trait LoadBalancerData { /** Get the number of activations across all namespaces. */ - def totalActivationCount: Future[Int] + def totalActivationCount: Int /** * Get the number of activations for a specific namespace. @@ -36,14 +38,20 @@ trait LoadBalancerData { * @param namespace The namespace to get the activation count for * @return a map (namespace -> number of activations in the system) */ - def activationCountOn(namespace: UUID): Future[Int] + def activationCountOn(namespace: UUID): Int /** * Get the number of activations for each invoker. * * @return a map (invoker -> number of activations queued for the invoker) */ - def activationCountPerInvoker: Future[Map[String, Int]] + def activationCountPerInvoker: Map[String, Int] + + /** + * Get the number of activations waiting at the overflow queue + * @return + */ + def overflowActivationCount: Int /** * Get an activation entry for a given activation id. @@ -60,10 +68,11 @@ trait LoadBalancerData { * @param update block calculating the entry to add. * Note: This is evaluated iff the entry * didn't exist before. + * @param isOverflow true if this activation should count against user rates (otherwise only counts for invoker stats) * @return the entry calculated by the block or iff it did * exist before the entry from the state */ - def putActivation(id: ActivationId, update: => ActivationEntry): ActivationEntry + def putActivation(id: ActivationId, update: => ActivationEntry, isOverflow: Boolean = false): ActivationEntry /** * Removes the given entry. diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala index 0b5a06d385..55d0dc1da2 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala @@ -17,11 +17,11 @@ package whisk.core.loadBalancer +import akka.actor.Actor +import akka.actor.ActorRef import java.nio.charset.StandardCharsets - import scala.annotation.tailrec import scala.concurrent.Await -import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration.DurationInt @@ -30,24 +30,29 @@ import scala.util.Success import org.apache.kafka.clients.producer.RecordMetadata import akka.actor.ActorRefFactory import akka.actor.ActorSystem +import akka.actor.Cancellable import akka.actor.Props -import akka.cluster.Cluster import akka.util.Timeout import akka.pattern.ask +import java.util.concurrent.atomic.AtomicBoolean +import scala.collection.mutable +import scala.concurrent.ExecutionContext +import scala.concurrent.duration.FiniteDuration import whisk.common.Logging import whisk.common.LoggingMarkers import whisk.common.TransactionId import whisk.core.WhiskConfig import whisk.core.WhiskConfig._ +import whisk.core.connector.MessageConsumer import whisk.core.connector.{ActivationMessage, CompletionMessage} import whisk.core.connector.MessageFeed import whisk.core.connector.MessageProducer import whisk.core.connector.MessagingProvider +import whisk.core.connector.OverflowMessage import whisk.core.database.NoDocumentException import whisk.core.entity.{ActivationId, WhiskActivation} import whisk.core.entity.EntityName import whisk.core.entity.ExecutableWhiskAction -import whisk.core.entity.Identity import whisk.core.entity.InstanceId import whisk.core.entity.UUID import whisk.core.entity.WhiskAction @@ -56,8 +61,6 @@ import whisk.spi.SpiLoader trait LoadBalancer { - val activeAckTimeoutGrace = 1.minute - /** Gets the number of in-flight activations for a specific user. */ def activeActivationsFor(namespace: UUID): Future[Int] @@ -80,62 +83,158 @@ trait LoadBalancer { implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] } +case class Publish(action: ExecutableWhiskAction, msg: ActivationMessage) -class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore: EntityStore)( - implicit val actorSystem: ActorSystem, - logging: Logging) +class LoadBalancerActorService( + config: WhiskConfig, + instance: InstanceId, + invokerPool: ActorRef, + loadBalancerData: LoadBalancerData)(implicit val actorSystem: ActorSystem, logging: Logging) extends LoadBalancer { /** The execution context for futures */ implicit val executionContext: ExecutionContext = actorSystem.dispatcher + /** Gets a producer which can publish messages to the kafka bus. */ + private val messagingProvider = SpiLoader.get[MessagingProvider] + private val messageProducer = messagingProvider.getProducer(config, executionContext) + + /** + * Subscribes to active acks (completion messages from the invokers), and + * registers a handler for received active acks from invokers. + */ + val maxActiveAcksPerPoll = 128 + val activeAckConsumer = + messagingProvider.getConsumer(config, "completions", s"completed${instance.toInt}", maxPeek = maxActiveAcksPerPoll) + + val maxOverflowMsgPerPoll = config.loadbalancerInvokerBusyThreshold //TODO: only pull enough messages that can be processed immediately + val overflowConsumer = + messagingProvider.getConsumer(config, "overflow", s"overflow", maxPeek = maxOverflowMsgPerPoll) + + /** setup the LoadBalancerActor */ + val lbActor = actorSystem.actorOf( + Props(new LoadBalancerActor(config, instance, invokerPool, activeAckConsumer, overflowConsumer, loadBalancerData))) + invokerPool ! SubscribeLoadBalancer(lbActor) + + implicit val timeout = Timeout(30.seconds) + + /** Gets the number of in-flight activations for a specific user. */ + override def activeActivationsFor(namespace: UUID) = Future.successful(0) + + /** Gets the number of in-flight activations in the system. */ + override def totalActiveActivations = Future.successful(0) + + /** + * Publishes activation message on internal bus for an invoker to pick up. + * + * @param action the action to invoke + * @param msg the activation message to publish on an invoker topic + * @param transid the transaction id for the request + * @return result a nested Future the outer indicating completion of publishing and + * the inner the completion of the action (i.e., the result) + * if it is ready before timeout (Right) otherwise the activation id (Left). + * The future is guaranteed to complete within the declared action time limit + * plus a grace period (see activeAckTimeoutGrace). + */ + override def publish(action: ExecutableWhiskAction, msg: ActivationMessage)(implicit transid: TransactionId) = { + val res = lbActor.ask(Publish(action, msg)).mapTo[Future[Either[ActivationId, WhiskActivation]]] + res //Future.successful(res) + } + + def allInvokers: Future[IndexedSeq[(InstanceId, InvokerState)]] = + invokerPool.ask(GetStatus).mapTo[IndexedSeq[(InstanceId, InvokerState)]] +} + +class LoadBalancerActor(config: WhiskConfig, + instance: InstanceId, + invokerPool: ActorRef, + activeAckConsumer: MessageConsumer, + overflowConsumer: MessageConsumer, + val loadBalancerData: LoadBalancerData)(implicit logging: Logging) + extends Actor { + override def postStop() { + activeAckConsumer.close() + overflowConsumer.close() + } + implicit val actorSystem = context.system + implicit val ec = context.dispatcher + val activeAckTimeoutGrace = 1.minute + + var allInvokersLocal = IndexedSeq[(InstanceId, InvokerState)]() + val countersLocal = mutable.Map[InstanceId, Int]() + var localOverflowActivationCount: Int = 0 + val overflowState = new AtomicBoolean(false) + /** How many invokers are dedicated to blackbox images. We range bound to something sensical regardless of configuration. */ private val blackboxFraction: Double = Math.max(0.0, Math.min(1.0, config.controllerBlackboxFraction)) logging.info(this, s"blackboxFraction = $blackboxFraction")(TransactionId.loadbalancer) - /** Feature switch for shared load balancer data **/ - private val loadBalancerData = { - if (config.controllerLocalBookkeeping) { - new LocalLoadBalancerData() - } else { + override def receive = { + case Publish(action, msg) => + sender() ! publish(action, msg)(msg.transid) + case StatusUpdate(invokers) => + allInvokersLocal = invokers + } - /** Specify how seed nodes are generated */ - val seedNodesProvider = new StaticSeedNodesProvider(config.controllerSeedNodes, actorSystem.name) - Cluster(actorSystem).joinSeedNodes(seedNodesProvider.getSeedNodes()) - new DistributedLoadBalancerData() + def publish(action: ExecutableWhiskAction, msg: ActivationMessage)( + implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = { + val hash = generateHash(msg.user.namespace, action) + if (!overflowState.get()) { + sendToInvokerOrOverflow(msg, action, hash, action.exec.pull) + } else { + sendActivationToOverflow( + messageProducer, + OverflowMessage(transid, msg, action.limits.timeout.duration.toSeconds.toInt, hash, action.exec.pull, instance)) + .flatMap { _ => + val entry = setupActivation(action.limits.timeout.duration, msg.activationId, msg.user.uuid, None, transid) + entry.promise.future + } } + } - override def activeActivationsFor(namespace: UUID) = loadBalancerData.activationCountOn(namespace) + /** Generates a hash based on the string representation of namespace and action */ + private def generateHash(namespace: EntityName, action: ExecutableWhiskAction): Int = { + (namespace.asString.hashCode() ^ action.fullyQualifiedName(false).asString.hashCode()).abs + } - override def totalActiveActivations = loadBalancerData.totalActivationCount + private def sendToInvokerOrOverflow(msg: ActivationMessage, action: ExecutableWhiskAction, hash: Int, pull: Boolean)( + implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = { + val invMatched = chooseInvoker(hash, pull, false) + val entry = setupActivation(action.limits.timeout.duration, msg.activationId, msg.user.uuid, invMatched, transid) - override def publish(action: ExecutableWhiskAction, msg: ActivationMessage)( - implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = { - chooseInvoker(msg.user, action).flatMap { invokerName => - val entry = setupActivation(action, msg.activationId, msg.user.uuid, invokerName, transid) - sendActivationToInvoker(messageProducer, msg, invokerName).map { _ => - entry.promise.future - } + invMatched match { + case Some(i) => + LoadBalancerService.sendActivationToInvoker(messageProducer, msg, i).flatMap { _ => + entry.promise.future + } + case None => + if (overflowState.compareAndSet(false, true)) { + logging.info(this, "entering overflow state; no invokers have capacity") + } + + sendActivationToOverflow( + messageProducer, + OverflowMessage(transid, msg, action.limits.timeout.duration.toSeconds.toInt, hash, pull, instance)).flatMap { + _ => + entry.promise.future + } } } - /** An indexed sequence of all invokers in the current system */ - def allInvokers: Future[IndexedSeq[(InstanceId, InvokerState)]] = - invokerPool - .ask(GetStatus)(Timeout(5.seconds)) - .mapTo[IndexedSeq[(InstanceId, InvokerState)]] - /** * Tries to fill in the result slot (i.e., complete the promise) when a completion message arrives. * The promise is removed form the map when the result arrives or upon timeout. * - * @param msg is the kafka message payload as Json + * @param response the ActivationId OR the WhiskActivation response + * @param tid transaction id + * @param forced true if this activation was expired before the active ack + * @param invoker Some(InstanceId) if it was sent to invoker, or None if it was overflow (expired) */ private def processCompletion(response: Either[ActivationId, WhiskActivation], tid: TransactionId, forced: Boolean, - invoker: InstanceId): Unit = { + invoker: Option[InstanceId]): Unit = { val aid = response.fold(l => l, r => r.activationId) // treat left as success (as it is the result of a message exceeding the bus limit) @@ -143,13 +242,30 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore loadBalancerData.removeActivation(aid) match { case Some(entry) => + //cancel the scheduled timeout handler + timeouts.remove(aid).foreach(_.cancel()) logging.info(this, s"${if (!forced) "received" else "forced"} active ack for '$aid'")(tid) // Active acks that are received here are strictly from user actions - health actions are not part of // the load balancer's activation map. Inform the invoker pool supervisor of the user action completion. // If the active ack was forced, because the waiting period expired, treat it as a failed activation. // A cluster of such failures will eventually turn the invoker unhealthy and suspend queuing activations // to that invoker topic. - invokerPool ! InvocationFinishedMessage(invoker, isSuccess && !forced) + entry.invokerName.foreach(invokerInstance => { + invokerPool ! InvocationFinishedMessage(invokerInstance, isSuccess && !forced) + //if processing overflow that initiated elsewhere, propagate the completion + entry.originalController.foreach(controllerInstance => { + val msg = CompletionMessage(tid, response, invokerInstance) + messageProducer.send(s"completed${controllerInstance.toInt}", msg) + }) + }) + + //if this is an entry for processing overflow, adjust overflow state if needed + if (entry.isOverflow) { + localOverflowActivationCount -= 1 + if (overflowState.get() && localOverflowActivationCount == 0 && overflowState.compareAndSet(true, false)) { + logging.info(this, "removing overflow state after processing outstanding overflow messages") + } + } if (!forced) { entry.promise.trySuccess(response) } else { @@ -161,7 +277,7 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore // for activations that already timed out. // For both cases, it looks like the invoker works again and we should send the status of // the activation to the invokerPool. - invokerPool ! InvocationFinishedMessage(invoker, isSuccess) + invoker.foreach(invokerPool ! InvocationFinishedMessage(_, isSuccess)) logging.debug(this, s"received active ack for '$aid' which has no entry")(tid) case None => // the entry has already been removed by an active ack. This part of the code is reached by the timeout. @@ -170,26 +286,38 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore } } + val timeouts = mutable.Map[ActivationId, Cancellable]() + /** * Creates an activation entry and insert into various maps. */ - private def setupActivation(action: ExecutableWhiskAction, + private def setupActivation(actionTimeout: FiniteDuration, activationId: ActivationId, namespaceId: UUID, - invokerName: InstanceId, - transid: TransactionId): ActivationEntry = { - val timeout = action.limits.timeout.duration + activeAckTimeoutGrace + invokerName: Option[InstanceId], + transid: TransactionId, + originalController: Option[InstanceId] = None, + isOverflow: Boolean = false): ActivationEntry = { + val timeout = actionTimeout + activeAckTimeoutGrace // Install a timeout handler for the catastrophic case where an active ack is not received at all // (because say an invoker is down completely, or the connection to the message bus is disrupted) or when // the active ack is significantly delayed (possibly dues to long queues but the subject should not be penalized); // in this case, if the activation handler is still registered, remove it and update the books. - loadBalancerData.putActivation(activationId, { - actorSystem.scheduler.scheduleOnce(timeout) { - processCompletion(Left(activationId), transid, forced = true, invoker = invokerName) - } - - ActivationEntry(activationId, namespaceId, invokerName, Promise[Either[ActivationId, WhiskActivation]]()) - }) + loadBalancerData.putActivation( + activationId, { + timeouts.put(activationId, actorSystem.scheduler.scheduleOnce(timeout) { + processCompletion(Left(activationId), transid, forced = true, invokerName) + }) + + ActivationEntry( + activationId, + namespaceId, + invokerName, + Promise[Either[ActivationId, WhiskActivation]], + originalController, + isOverflow) + }, + isOverflow) } /** @@ -215,50 +343,25 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore } /** Gets a producer which can publish messages to the kafka bus. */ - private val messagingProvider = SpiLoader.get[MessagingProvider] - private val messageProducer = messagingProvider.getProducer(config, executionContext) + val messagingProvider = SpiLoader.get[MessagingProvider] + private val messageProducer = messagingProvider.getProducer(config, context.dispatcher) - private def sendActivationToInvoker(producer: MessageProducer, - msg: ActivationMessage, - invoker: InstanceId): Future[RecordMetadata] = { + private def sendActivationToOverflow(producer: MessageProducer, msg: OverflowMessage): Future[RecordMetadata] = { implicit val transid = msg.transid - val topic = s"invoker${invoker.toInt}" + val topic = "overflow" val start = transid.started( this, LoggingMarkers.CONTROLLER_KAFKA, - s"posting topic '$topic' with activation id '${msg.activationId}'") + s"posting overflow topic '$topic' with activation id '${msg.msg.activationId}'") producer.send(topic, msg).andThen { case Success(status) => + localOverflowActivationCount += 1 transid.finished(this, start, s"posted to ${status.topic()}[${status.partition()}][${status.offset()}]") case Failure(e) => transid.failed(this, start, s"error on posting to topic $topic") } } - private val invokerPool = { - // Do not create the invokerPool if it is not possible to create the health test action to recover the invokers. - InvokerPool - .healthAction(instance) - .map { - // Await the creation of the test action; on failure, this will abort the constructor which should - // in turn abort the startup of the controller. - a => - Await.result(createTestActionForInvokerHealth(entityStore, a), 1.minute) - } - .orElse { - throw new IllegalStateException( - "cannot create test action for invoker health because runtime manifest is not valid") - } - - val maxPingsPerPoll = 128 - val pingConsumer = - messagingProvider.getConsumer(config, s"health${instance.toInt}", "health", maxPeek = maxPingsPerPoll) - val invokerFactory = (f: ActorRefFactory, invokerInstance: InstanceId) => - f.actorOf(InvokerActor.props(invokerInstance, instance)) - - actorSystem.actorOf( - InvokerPool.props(invokerFactory, (m, i) => sendActivationToInvoker(messageProducer, m, i), pingConsumer)) - } /** * Subscribes to active acks (completion messages from the invokers), and @@ -266,8 +369,6 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore */ val maxActiveAcksPerPoll = 128 val activeAckPollDuration = 1.second - private val activeAckConsumer = - messagingProvider.getConsumer(config, "completions", s"completed${instance.toInt}", maxPeek = maxActiveAcksPerPoll) val activationFeed = actorSystem.actorOf(Props { new MessageFeed( "activeack", @@ -282,7 +383,7 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore val raw = new String(bytes, StandardCharsets.UTF_8) CompletionMessage.parse(raw) match { case Success(m: CompletionMessage) => - processCompletion(m.response, m.transid, forced = false, invoker = m.invoker) + processCompletion(m.response, m.transid, forced = false, invoker = Some(m.invoker)) activationFeed ! MessageFeed.Processed case Failure(t) => @@ -290,6 +391,80 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore logging.error(this, s"failed processing message: $raw with $t") } } + // //TODO: only pull enough messages that can be processed immediately + val overflowPollDuration = 200.milliseconds + + val offsetMonitor = actorSystem.actorOf(Props { + new Actor { + override def receive = { + case MessageFeed.MaxOffset => + if (overflowState.compareAndSet(true, false)) { + logging.info(this, "resetting overflow state via offsetMonitor for overflow topic") + } + } + } + }) + + //ideally the overflow capacity should be dynamic, based on free invokers, to provide some backpressure. For now, capacity of 1 + //(or some small number less than number of invokers) may be ok. + val overflowHandlerCapacity = overflowConsumer.maxPeek + val overflowFeed = actorSystem.actorOf(Props { + new MessageFeed( + "overflow", + logging, + overflowConsumer, + overflowHandlerCapacity, + overflowPollDuration, + processOverflow, + offsetMonitor = Some(offsetMonitor)) + }) + + private def processOverflow(bytes: Array[Byte]): Future[Unit] = Future { + val raw = new String(bytes, StandardCharsets.UTF_8) + OverflowMessage.parse(raw) match { + case Success(m: OverflowMessage) => + implicit val tid = m.msg.transid + logging.info(this, s"processing overflow msg for activation ${m.msg.activationId}") + //remove from entries (will replace with an overflow entry if it exists locally) + val entryOption = loadBalancerData + .removeActivation(m.msg.activationId) + + //process the activation request: update the invoker ref, and send to invoker + chooseInvoker(m.hash, m.pull, true) match { + case Some(instanceId) => + //Update the invoker name for the overflow ActivationEntry + //The timeout for the activationId will still be effective. + entryOption match { + case Some(entry) => + entry.invokerName = Some(instanceId) + loadBalancerData.putActivation(m.msg.activationId, entry, false) + LoadBalancerService.sendActivationToInvoker(messageProducer, m.msg, instanceId) + case None => + //TODO: adjust the timeout for time spent in overflow topic! + val entry = setupActivation( + m.actionTimeoutSeconds.seconds, + m.msg.activationId, + m.msg.user.uuid, + Some(instanceId), + m.msg.transid, + Some(m.originalController), + true) + loadBalancerData.putActivation(m.msg.activationId, entry, true) + val updatedMsg = m.msg.copy(rootControllerIndex = this.instance) + LoadBalancerService.sendActivationToInvoker(messageProducer, updatedMsg, instanceId) + } + + case None => + //if no invokers available, all activations will go to overflow queue till capacity is available again + logging.error(this, "invalid overflow processing; no invokers have capacity") + //TODO: should requeue to overflow? + } + overflowFeed ! MessageFeed.Processed + + case Failure(t) => + logging.error(this, s"failed processing overflow message: $raw with $t") + } + } /** Compute the number of blackbox-dedicated invokers by applying a rounded down fraction of all invokers (but at least 1). */ private def numBlackbox(totalInvokers: Int) = Math.max(1, (totalInvokers.toDouble * blackboxFraction).toInt) @@ -312,30 +487,14 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore } /** Determine which invoker this activation should go to. Due to dynamic conditions, it may return no invoker. */ - private def chooseInvoker(user: Identity, action: ExecutableWhiskAction): Future[InstanceId] = { - val hash = generateHash(user.namespace, action) - - loadBalancerData.activationCountPerInvoker.flatMap { currentActivations => - allInvokers.flatMap { invokers => - val invokersToUse = if (action.exec.pull) blackboxInvokers(invokers) else managedInvokers(invokers) - val invokersWithUsage = invokersToUse.view.map { - // Using a view defers the comparably expensive lookup to actual access of the element - case (instance, state) => (instance, state, currentActivations.getOrElse(instance.toString, 0)) - } - - LoadBalancerService.schedule(invokersWithUsage, config.loadbalancerInvokerBusyThreshold, hash) match { - case Some(invoker) => Future.successful(invoker) - case None => - logging.error(this, s"all invokers down")(TransactionId.invokerHealth) - Future.failed(new LoadBalancerException("no invokers available")) - } - } + protected def chooseInvoker(hash: Int, pull: Boolean, overflow: Boolean): Option[InstanceId] = { + val invokersToUse = if (pull) blackboxInvokers(allInvokersLocal) else managedInvokers(allInvokersLocal) + val currentActivations = loadBalancerData.activationCountPerInvoker + val invokersWithUsage = invokersToUse.view.map { + // Using a view defers the comparably expensive lookup to actual access of the element + case (instance, state) => (instance, state, currentActivations.getOrElse(instance.toString, 0)) } - } - - /** Generates a hash based on the string representation of namespace and action */ - private def generateHash(namespace: EntityName, action: ExecutableWhiskAction): Int = { - (namespace.asString.hashCode() ^ action.fullyQualifiedName(false).asString.hashCode()).abs + LoadBalancerService.schedule(invokersWithUsage, config.loadbalancerInvokerBusyThreshold, hash, overflow) } } @@ -368,19 +527,18 @@ object LoadBalancerService { /** * Scans through all invokers and searches for an invoker, that has a queue length - * below the defined threshold. The threshold is subject to a 3 times back off. Iff - * no "underloaded" invoker was found it will default to the first invoker in the - * step-defined progression that is healthy. + * below the defined threshold. Iff no "underloaded" invoker was found, return None. * * @param invokers a list of available invokers to search in, including their state and usage * @param invokerBusyThreshold defines when an invoker is considered overloaded * @param hash stable identifier of the entity to be scheduled + * @param overflow scheduling during overflow processing (true) *must* find an invoker to use * @return an invoker to schedule to or None of no invoker is available */ - def schedule(invokers: Seq[(InstanceId, InvokerState, Int)], - invokerBusyThreshold: Int, - hash: Int): Option[InstanceId] = { + def schedule(invokers: Seq[(InstanceId, InvokerState, Int)], invokerBusyThreshold: Int, hash: Int, overflow: Boolean)( + implicit logging: Logging): Option[InstanceId] = { + require(invokerBusyThreshold > 0, "invokerBusyThreshold should be > 0") val numInvokers = invokers.size if (numInvokers > 0) { val homeInvoker = hash % numInvokers @@ -394,13 +552,75 @@ object LoadBalancerService { .map(invokers) .filter(_._2 == Healthy) - invokerProgression - .find(_._3 < invokerBusyThreshold) - .orElse(invokerProgression.find(_._3 < invokerBusyThreshold * 2)) - .orElse(invokerProgression.find(_._3 < invokerBusyThreshold * 3)) - .orElse(invokerProgression.headOption) - .map(_._1) - } else None + if (overflow) { + //should not arrive here without an invoker who is not busy! but just in case, use the step progression with incrementing busy-ness + invokerProgression + .find(_._3 < invokerBusyThreshold) + .orElse({ + logging.warn(this, "scheduling to a busy invoker during overflow processing") + invokerProgression.find(_._3 < invokerBusyThreshold * 2) + }) + .orElse(invokerProgression.find(_._3 < invokerBusyThreshold * 3)) + .orElse(invokerProgression.headOption) + .map(_._1) + } else { + invokerProgression + .find(_._3 < invokerBusyThreshold) + //don't consider invokers that have reached capacity when not in overflow state + .map(_._1) + } + } else { + logging.warn(this, "no invokers available") + None + } + } + def sendActivationToInvoker(producer: MessageProducer, msg: ActivationMessage, invoker: InstanceId)( + implicit logging: Logging, + ec: ExecutionContext): Future[RecordMetadata] = { + implicit val transid = msg.transid + + val topic = s"invoker${invoker.toInt}" + val start = transid.started( + this, + LoggingMarkers.CONTROLLER_KAFKA, + s"posting topic '$topic' with activation id '${msg.activationId}'") + producer.send(topic, msg).andThen { + case Success(status) => + transid.finished(this, start, s"posted to ${status.topic()}[${status.partition()}][${status.offset()}]") + case Failure(e) => transid.failed(this, start, s"error on posting to topic $topic") + } + } + + def createInvokerPool(instance: InstanceId, + actorSystem: ActorSystem, + executionContext: ExecutionContext, + entityStore: EntityStore, + messageProducer: MessageProducer, + healthConsumer: MessageConsumer)(implicit logging: Logging): ActorRef = { + implicit val ec: ExecutionContext = executionContext + // Do not create the invokerPool if it is not possible to create the health test action to recover the invokers. + InvokerPool + .healthAction(instance) + .map { + // Await the creation of the test action; on failure, this will abort the constructor which should + // in turn abort the startup of the controller. + a => + Await.result(InvokerPool.createTestActionForInvokerHealth(entityStore, a), 1.minute) + } + .orElse { + throw new IllegalStateException( + "cannot create test action for invoker health because runtime manifest is not valid") + } + + val invokerFactory = + (f: ActorRefFactory, invokerInstance: InstanceId) => f.actorOf(InvokerActor.props(invokerInstance, instance)) + + actorSystem.actorOf( + InvokerPool + .props( + invokerFactory, + (m, i) => LoadBalancerService.sendActivationToInvoker(messageProducer, m, i), + healthConsumer)) } } diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala index 92e3789e76..dee21d081b 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala @@ -20,7 +20,6 @@ package whisk.core.loadBalancer import java.util.concurrent.atomic.AtomicInteger import scala.collection.concurrent.TrieMap -import scala.concurrent.Future import whisk.core.entity.{ActivationId, UUID} /** @@ -36,27 +35,36 @@ class LocalLoadBalancerData() extends LoadBalancerData { private val activationByNamespaceId = TrieMap[UUID, AtomicInteger]() private val activationsById = TrieMap[ActivationId, ActivationEntry]() private val totalActivations = new AtomicInteger(0) + private val overflowActivations = new AtomicInteger(0) - override def totalActivationCount: Future[Int] = Future.successful(totalActivations.get) + override def totalActivationCount: Int = totalActivations.get - override def activationCountOn(namespace: UUID): Future[Int] = { - Future.successful(activationByNamespaceId.get(namespace).map(_.get).getOrElse(0)) + override def activationCountOn(namespace: UUID): Int = { + activationByNamespaceId.get(namespace).map(_.get).getOrElse(0) } - override def activationCountPerInvoker: Future[Map[String, Int]] = { - Future.successful(activationByInvoker.toMap.mapValues(_.get)) + override def activationCountPerInvoker: Map[String, Int] = { + activationByInvoker.toMap.mapValues(_.get) } + override def overflowActivationCount: Int = overflowActivations.get + override def activationById(activationId: ActivationId): Option[ActivationEntry] = { activationsById.get(activationId) } - override def putActivation(id: ActivationId, update: => ActivationEntry): ActivationEntry = { + override def putActivation(id: ActivationId, + update: => ActivationEntry, + isOverflow: Boolean = false): ActivationEntry = { activationsById.getOrElseUpdate(id, { val entry = update totalActivations.incrementAndGet() activationByNamespaceId.getOrElseUpdate(entry.namespaceId, new AtomicInteger(0)).incrementAndGet() - activationByInvoker.getOrElseUpdate(entry.invokerName.toString, new AtomicInteger(0)).incrementAndGet() + entry.invokerName match { + case Some(i) => activationByInvoker.getOrElseUpdate(i.toString, new AtomicInteger(0)).incrementAndGet() + case None => overflowActivations.incrementAndGet() + } + entry }) } @@ -65,7 +73,10 @@ class LocalLoadBalancerData() extends LoadBalancerData { activationsById.remove(entry.id).map { x => totalActivations.decrementAndGet() activationByNamespaceId.getOrElseUpdate(entry.namespaceId, new AtomicInteger(0)).decrementAndGet() - activationByInvoker.getOrElseUpdate(entry.invokerName.toString, new AtomicInteger(0)).decrementAndGet() + entry.invokerName match { + case Some(i) => activationByInvoker.getOrElseUpdate(i.toString, new AtomicInteger(0)).decrementAndGet() + case None => overflowActivations.decrementAndGet() + } x } } diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/SharedDataService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/SharedDataService.scala index d0595d3ece..3b53fee9e3 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/SharedDataService.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/SharedDataService.scala @@ -20,31 +20,40 @@ package whisk.core.loadBalancer import akka.actor.{Actor, ActorLogging, ActorRef, Props} import akka.cluster.Cluster import akka.cluster.ClusterEvent._ -import akka.cluster.ddata.{DistributedData, PNCounterMap, PNCounterMapKey} +//import akka.cluster.ddata.Key +import akka.cluster.ddata.ORMap +import akka.cluster.ddata.ORMapKey +import akka.cluster.ddata.PNCounterMapKey +import akka.cluster.ddata.{DistributedData, PNCounterMap} import akka.cluster.ddata.Replicator._ import whisk.common.AkkaLogging +import whisk.core.entity.InstanceId -case class IncreaseCounter(key: String, value: Long) -case class DecreaseCounter(key: String, value: Long) +case class IncreaseCounter(key: String, instance: InstanceId, value: Long) +case class DecreaseCounter(key: String, instance: InstanceId, value: Long) case class ReadCounter(key: String) case class RemoveCounter(key: String) +case class Updated(storageName: String, entries: Map[String, Map[Int, Int]]) + case object GetMap /** * Companion object to specify actor properties from the outside, e.g. name of the shared map and cluster seed nodes */ object SharedDataService { - def props(storageName: String): Props = - Props(new SharedDataService(storageName)) + def props(storageName: String, monitor: ActorRef): Props = + Props(new SharedDataService(storageName, monitor)) } -class SharedDataService(storageName: String) extends Actor with ActorLogging { +class SharedDataService(storageName: String, monitor: ActorRef) extends Actor with ActorLogging { val replicator = DistributedData(context.system).replicator val logging = new AkkaLogging(context.system.log) - val storage = PNCounterMapKey[String](storageName) + val storage = ORMapKey[String, PNCounterMap[Int]](storageName) // PNCounterMapKey[String](storageName) + + def instanceKey(instance: InstanceId) = PNCounterMapKey[Int](instance.toString) implicit val node = Cluster(context.system) @@ -54,9 +63,11 @@ class SharedDataService(storageName: String) extends Actor with ActorLogging { override def preStart(): Unit = { replicator ! Subscribe(storage, self) node.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember]) - replicator ! Update(storage, PNCounterMap.empty[String], writeLocal)(_.remove(node, "0")) + replicator ! Update(storage, ORMap.empty[String, PNCounterMap[Int]], writeLocal)(_.remove(node, "0")) + } + override def postStop(): Unit = { + node.unsubscribe(self) } - override def postStop(): Unit = node.unsubscribe(self) /** * CRUD operations on the counter, process cluster member events for logging @@ -64,35 +75,41 @@ class SharedDataService(storageName: String) extends Actor with ActorLogging { */ def receive = { - case (IncreaseCounter(key, increment)) => - replicator ! Update(storage, PNCounterMap.empty[String], writeLocal)(_.increment(key, increment)) - - case (DecreaseCounter(key, decrement)) => - replicator ! Update(storage, PNCounterMap[String], writeLocal)(_.decrement(key, decrement)) - + case (IncreaseCounter(key, instance, increment)) => + replicator ! Update(storage, ORMap.empty[String, PNCounterMap[Int]], writeLocal)(m => { + m + (key, m.getOrElse(key, PNCounterMap[Int]()).increment(instance.toInt, increment)) + }) + case (DecreaseCounter(key, instance, decrement)) => + replicator ! Update(storage, ORMap.empty[String, PNCounterMap[Int]], writeLocal)(m => { + m + (key, m.getOrElse(key, PNCounterMap[Int]()).decrement(instance.toInt, decrement)) + }) case GetMap => replicator ! Get(storage, readLocal, request = Some((sender()))) - case MemberUp(member) => logging.info(this, "Member is Up: " + member.address) - case MemberRemoved(member, previousStatus) => logging.warn(this, s"Member is Removed: ${member.address} after $previousStatus") - - case c @ Changed(_) => + case c @ Changed(e) => logging.debug(this, "Current elements: " + c.get(storage)) + val res = c.get(storage).entries.mapValues(_.entries.mapValues(_.toInt)) + if (res.nonEmpty) { + res.values.foreach(_.values.foreach(i => { + require(i >= 0, s"values cannot be less than 0 ${res}") + })) + monitor ! Updated(storageName, res) + } case g @ GetSuccess(_, Some((replyTo: ActorRef))) => - val map = g.get(storage).entries + val map = g.get(storage).entries.mapValues(_.entries) replyTo ! map case g @ GetSuccess(_, Some((replyTo: ActorRef, key: String))) => if (g.get(storage).contains(key)) { - val response = g.get(storage).getValue(key).intValue() - replyTo ! response + val response = g.get(storage).getOrElse(key, PNCounterMap[Int]()) + replyTo ! response.entries } else replyTo ! None - - case _ => // ignore + case msg => + // ignore } } diff --git a/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala b/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala index 9ab2973b78..c71e442c41 100644 --- a/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala +++ b/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala @@ -19,20 +19,25 @@ package whisk.core.connector.test import java.util.ArrayList import java.util.concurrent.LinkedBlockingQueue - import scala.concurrent.Future import scala.concurrent.duration.Duration import scala.collection.JavaConversions._ - import org.apache.kafka.clients.producer.RecordMetadata import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.Record import common.StreamLogging +import scala.collection.mutable +import scala.concurrent.ExecutionContext +import scala.concurrent.duration._ import whisk.common.Counter +import whisk.common.Logging +import whisk.core.WhiskConfig import whisk.core.connector.Message import whisk.core.connector.MessageConsumer import whisk.core.connector.MessageProducer +import whisk.core.connector.MessagingProvider +import whisk.core.entity.InstanceId class TestConnector(topic: String, override val maxPeek: Int, allowMoreThanMax: Boolean) extends MessageConsumer @@ -110,3 +115,121 @@ class TestConnector(topic: String, override val maxPeek: Int, allowMoreThanMax: @volatile private var closed = false private var offset = -1L } + +object TestMessagingProvider extends MessagingProvider { + val queues = mutable.Map[String, LinkedBlockingQueue[Message]]() + + val instanceIdMap = mutable.Map[TestConsumer, InstanceId]() + override def getConsumer(config: WhiskConfig, + groupId: String, + topic: String, + maxPeek: Int, + maxPollInterval: FiniteDuration)(implicit logging: Logging) = { + this.synchronized { + + val queue = queues.getOrElseUpdate(topic, { + new LinkedBlockingQueue[Message]() + }) + new TestConsumer(queue, topic, maxPeek, false) + } + + } + + override def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: Logging) = + //connector + new MessageProducer with StreamLogging { + def send(topic: String, msg: Message): Future[RecordMetadata] = { + val queue = queues.getOrElseUpdate(topic, { + new LinkedBlockingQueue[Message]() + }) + queue.synchronized { + if (queue.offer(msg)) { + logging.info(this, s"put: $msg") + Future.successful( + new RecordMetadata(new TopicPartition(topic, 0), 0, queue.size, Record.NO_TIMESTAMP, -1, -1, -1)) + } else { + logging.error(this, s"put failed: $msg") + Future.failed(new IllegalStateException("failed to write msg")) + } + } + } + + def sendBulk(topic: String, msgs: Seq[Message]): Future[RecordMetadata] = { + val queue = queues.getOrElseUpdate(topic, new LinkedBlockingQueue[Message]()) + queue.synchronized { + if (queue.addAll(msgs)) { + logging.info(this, s"put: ${msgs.length} messages") + Future.successful( + new RecordMetadata(new TopicPartition(topic, 0), 0, queue.size, Record.NO_TIMESTAMP, -1, -1, -1)) + } else { + logging.error(this, s"put failed: ${msgs.length} messages") + Future.failed(new IllegalStateException("failed to write msg")) + } + } + } + + def close() = {} + def sentCount() = counter.next() + val counter = new Counter() + } + + def occupancy(topic: String) = { + + queues(topic).size() + + } +} + +class TestConsumer(queue: LinkedBlockingQueue[Message], topic: String, val maxPeek: Int, allowMoreThanMax: Boolean) + extends MessageConsumer + with StreamLogging { + var throwCommitException = false + @volatile var dontPeek: Boolean = false + @volatile private var closed = false + var offset = 0l + + /** + * Gets messages via a long poll. May or may not remove messages + * from the message connector. Use commit() to ensure messages are + * removed from the connector. + * + * @param duration for the long poll + * @return iterable collection (topic, partition, offset, bytes) + */ + override def peek(duration: Duration): Iterable[(String, Int, Long, Array[Byte])] = { + require(closed == false, "cannot operate on a closed consumer") + val res = if (dontPeek) { + List.empty + } else { + queue.synchronized { + val msgs = new ArrayList[Message] + queue.drainTo(msgs, if (allowMoreThanMax) Int.MaxValue else maxPeek) + val res = msgs map { m => + offset += 1 + (topic, -1, offset, m.serialize.getBytes) + } + res + } + } + val sleepTime: Long = duration.toMillis + Thread.sleep(sleepTime) + res + } + + /** + * Commits offsets from last peek operation to ensure they are removed + * from the connector. + */ + override def commit(): Unit = { + if (throwCommitException) { + throw new Exception("commit failed") + } else { + // nothing to do + } + } + + /** Closes consumer. */ + override def close(): Unit = this.closed = true + + def occupancy: Int = queue.size() +} diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/DistributedLoadBalancerDataTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/DistributedLoadBalancerDataTests.scala new file mode 100644 index 0000000000..ad155c8b36 --- /dev/null +++ b/tests/src/test/scala/whisk/core/loadBalancer/test/DistributedLoadBalancerDataTests.scala @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.loadBalancer.test + +import akka.actor.ActorSystem +import akka.testkit.ImplicitSender +import akka.testkit.TestKit +import akka.util.Timeout +import com.typesafe.config.ConfigFactory +import common.StreamLogging +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FlatSpecLike +import org.scalatest.Matchers +//import scala.collection.mutable +import scala.concurrent.Promise +import whisk.core.entity.InstanceId +import scala.concurrent.duration._ +import whisk.core.entity.ActivationId +import whisk.core.entity.UUID +import whisk.core.entity.WhiskActivation +import whisk.core.loadBalancer.ActivationEntry +import whisk.core.loadBalancer.DistributedLoadBalancerData +import whisk.core.loadBalancer.Updated + +// Define your test specific configuration here + +object DistributedLoadBalancerDataConfig { + val config = """ + akka.remote.netty.tcp { + hostname = "127.0.0.1" + port = 2555 + } + akka.actor.provider = cluster + """ +} + +class DistributedLoadBalancerDataTests + extends TestKit( + ActorSystem("ControllerCluster", ConfigFactory.parseString(DistributedLoadBalancerDataConfig.config))) + with ImplicitSender + with FlatSpecLike + with Matchers + with BeforeAndAfterAll + with StreamLogging { + + behavior of "DistributedLoadBalancerData" + + //val storageName = "Candidates" + val controller1 = InstanceId(123) + val controller2 = InstanceId(456) + val controller3 = InstanceId(789) + + val invoker1 = InstanceId(0) + val invoker2 = InstanceId(1) + + //val sharedDataService = system.actorOf(SharedDataService.props(storageName, testActor), name = "busyMan") + implicit val timeout = Timeout(5.seconds) + + val activationIdPromise = Promise[Either[ActivationId, WhiskActivation]]() + val namespace1 = UUID() + val firstEntry: ActivationEntry = + ActivationEntry(ActivationId(), namespace1, Some(invoker1), activationIdPromise) + val secondEntry: ActivationEntry = + ActivationEntry(ActivationId(), namespace1, Some(invoker2), activationIdPromise) + val firstOverflowEnty: ActivationEntry = ActivationEntry(ActivationId(), namespace1, None, activationIdPromise) + val secondOverflowEnty: ActivationEntry = ActivationEntry(ActivationId(), namespace1, None, activationIdPromise) + + val lbd1 = new DistributedLoadBalancerData(controller1, Some(testActor)) + val lbd2 = new DistributedLoadBalancerData(controller2, Some(testActor)) + + it should "reflect local changes immediately and replicated changes eventually" in { + + //store 1 activation per lbd + lbd1.putActivation(firstEntry.id, firstEntry) + lbd2.putActivation(secondEntry.id, secondEntry) + + //only local changes are visible before replication completes + lbd1.activationCountOn(firstEntry.namespaceId) shouldBe 1 + lbd1.activationCountPerInvoker shouldBe Map(firstEntry.invokerName.get.toString -> 1) + lbd1.activationById(firstEntry.id) shouldBe Some(firstEntry) + + //1 Updated msg per storagename per LBD (4 total when not in overflow) + //we cannot predict order of these updates + expectMsgClass(classOf[Updated]) + expectMsgClass(classOf[Updated]) + expectMsgClass(classOf[Updated]) + expectMsgClass(classOf[Updated]) + + //after replication, verify udpates + lbd1.activationCountOn(firstEntry.namespaceId) shouldBe 2 + lbd1.activationCountPerInvoker shouldBe Map( + firstEntry.invokerName.get.toString -> 1, + secondEntry.invokerName.get.toString -> 1) + lbd1.activationById(firstEntry.id) shouldBe Some(firstEntry) + lbd2.activationById(secondEntry.id) shouldBe Some(secondEntry) + + //both entries should NOT be visible to the other (only counters are replicated) + lbd2.activationById(firstEntry.id) shouldBe None + lbd1.activationById(secondEntry.id) shouldBe None + + // clean up activations + lbd1.removeActivation(firstEntry.id) + lbd2.removeActivation(secondEntry.id) + + //verify local changes on lbd1 + lbd1.activationCountOn(firstEntry.namespaceId) shouldBe 1 + lbd1.activationCountPerInvoker shouldBe Map( + firstEntry.invokerName.get.toString -> 0, + secondEntry.invokerName.get.toString -> 1) + lbd1.activationById(firstEntry.id) shouldBe None + + //verify local changes on lbd2 + lbd2.activationCountOn(secondEntry.namespaceId) shouldBe 1 + lbd2.activationCountPerInvoker shouldBe Map( + firstEntry.invokerName.get.toString -> 1, + secondEntry.invokerName.get.toString -> 0) + lbd2.activationById(secondEntry.id) shouldBe None + + //wait for replication + expectMsgAllClassOf(classOf[Updated], classOf[Updated], classOf[Updated], classOf[Updated]) + + //verify replicated changes on lbd1 + lbd1.activationCountOn(firstEntry.namespaceId) shouldBe 0 + lbd1.activationCountPerInvoker shouldBe Map( + firstEntry.invokerName.get.toString -> 0, + secondEntry.invokerName.get.toString -> 0) + + //verify replicated changes on lbd2 + lbd2.activationCountOn(secondEntry.namespaceId) shouldBe 0 + lbd2.activationCountPerInvoker shouldBe Map( + firstEntry.invokerName.get.toString -> 0, + secondEntry.invokerName.get.toString -> 0) + + } + +} diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala index 0e06b9ff2a..20eaba40e3 100644 --- a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala +++ b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala @@ -96,6 +96,7 @@ class InvokerSupervisionTests def timeout(actor: ActorRef) = actor ! FSM.StateTimeout /** Queries all invokers for their state */ + //TODO: test for Updated message (instead of querying for GetStatus each time) def allStates(pool: ActorRef) = Await.result(pool.ask(GetStatus).mapTo[IndexedSeq[(InstanceId, InvokerState)]], timeout.duration) diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala index af511021de..2e43b5c495 100644 --- a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala +++ b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala @@ -24,16 +24,16 @@ import org.scalatest.{FlatSpec, Matchers} import whisk.core.entity.{ActivationId, UUID, WhiskActivation} import whisk.core.loadBalancer.{ActivationEntry, DistributedLoadBalancerData, LocalLoadBalancerData} -import scala.concurrent.{Await, Future, Promise} +import scala.concurrent.{Promise} import whisk.core.entity.InstanceId -import scala.concurrent.duration._ - class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging { val activationIdPromise = Promise[Either[ActivationId, WhiskActivation]]() - val firstEntry: ActivationEntry = ActivationEntry(ActivationId(), UUID(), InstanceId(0), activationIdPromise) - val secondEntry: ActivationEntry = ActivationEntry(ActivationId(), UUID(), InstanceId(1), activationIdPromise) + val firstEntry: ActivationEntry = ActivationEntry(ActivationId(), UUID(), Some(InstanceId(0)), activationIdPromise) + val secondEntry: ActivationEntry = ActivationEntry(ActivationId(), UUID(), Some(InstanceId(1)), activationIdPromise) + val firstOverflowEnty: ActivationEntry = ActivationEntry(ActivationId(), UUID(), None, activationIdPromise) + val secondOverflowEnty: ActivationEntry = ActivationEntry(ActivationId(), UUID(), None, activationIdPromise) val port = 2552 val host = "127.0.0.1" @@ -44,32 +44,64 @@ class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging { .withFallback(ConfigFactory.load()) val actorSystemName = "controller-actor-system" + val instance = InstanceId(0) implicit val actorSystem = ActorSystem(actorSystemName, config) - def await[A](f: Future[A], timeout: FiniteDuration = 1.second) = Await.result(f, timeout) + //def await[A](f: Future[A], timeout: FiniteDuration = 1.second) = Await.result(f, timeout) behavior of "LoadBalancerData" it should "return the number of activations for a namespace" in { - val distributedLoadBalancerData = new DistributedLoadBalancerData() + val distributedLoadBalancerData = new DistributedLoadBalancerData(instance) val localLoadBalancerData = new LocalLoadBalancerData() // test all implementations val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData) loadBalancerDataArray.map { lbd => lbd.putActivation(firstEntry.id, firstEntry) - await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1 - await(lbd.activationCountPerInvoker) shouldBe Map(firstEntry.invokerName.toString -> 1) +// await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1 +// await(lbd.activationCountPerInvoker) shouldBe Map(firstEntry.invokerName.get.toString -> 1) + lbd.activationCountOn(firstEntry.namespaceId) shouldBe 1 + lbd.activationCountPerInvoker shouldBe Map(firstEntry.invokerName.get.toString -> 1) + lbd.activationById(firstEntry.id) shouldBe Some(firstEntry) + + // clean up after yourself + lbd.removeActivation(firstEntry.id) + } + } + + it should "return actions for invokers only when instanceId is not None" in { + val distributedLoadBalancerData = new DistributedLoadBalancerData(instance) + val localLoadBalancerData = new LocalLoadBalancerData() + + val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData) + loadBalancerDataArray.map { lbd => + lbd.putActivation(firstEntry.id, firstEntry) + lbd.putActivation(secondEntry.id, secondEntry) + lbd.putActivation(firstOverflowEnty.id, firstOverflowEnty) + lbd.putActivation(secondOverflowEnty.id, secondOverflowEnty) + + val res = lbd.activationCountPerInvoker + + res.get(firstEntry.invokerName.get.toString()) shouldBe Some(1) + res.get(secondEntry.invokerName.get.toString()) shouldBe Some(1) + lbd.activationById(firstEntry.id) shouldBe Some(firstEntry) + lbd.activationById(secondEntry.id) shouldBe Some(secondEntry) + + lbd.overflowActivationCount shouldBe 2 // clean up after yourself lbd.removeActivation(firstEntry.id) + lbd.removeActivation(secondEntry.id) + lbd.removeActivation(firstOverflowEnty.id) + lbd.removeActivation(secondOverflowEnty.id) } } it should "return the number of activations for each invoker" in { - val distributedLoadBalancerData = new DistributedLoadBalancerData() + val distributedLoadBalancerData = new DistributedLoadBalancerData(instance) val localLoadBalancerData = new LocalLoadBalancerData() val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData) @@ -77,10 +109,10 @@ class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging { lbd.putActivation(firstEntry.id, firstEntry) lbd.putActivation(secondEntry.id, secondEntry) - val res = await(lbd.activationCountPerInvoker) + val res = lbd.activationCountPerInvoker - res.get(firstEntry.invokerName.toString()) shouldBe Some(1) - res.get(secondEntry.invokerName.toString()) shouldBe Some(1) + res.get(firstEntry.invokerName.get.toString()) shouldBe Some(1) + res.get(secondEntry.invokerName.get.toString()) shouldBe Some(1) lbd.activationById(firstEntry.id) shouldBe Some(firstEntry) lbd.activationById(secondEntry.id) shouldBe Some(secondEntry) @@ -94,50 +126,56 @@ class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging { it should "remove activations and reflect that accordingly" in { - val distributedLoadBalancerData = new DistributedLoadBalancerData() + val distributedLoadBalancerData = new DistributedLoadBalancerData(instance) val localLoadBalancerData = new LocalLoadBalancerData() val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData) loadBalancerDataArray.map { lbd => lbd.putActivation(firstEntry.id, firstEntry) - val res = await(lbd.activationCountPerInvoker) - res.get(firstEntry.invokerName.toString()) shouldBe Some(1) + lbd.putActivation(firstOverflowEnty.id, firstOverflowEnty) + val res = lbd.activationCountPerInvoker + res.get(firstEntry.invokerName.get.toString()) shouldBe Some(1) + + lbd.activationCountOn(firstEntry.namespaceId) shouldBe 1 - await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1 + lbd.overflowActivationCount shouldBe 1 lbd.removeActivation(firstEntry) + lbd.removeActivation(firstOverflowEnty) - val resAfterRemoval = await(lbd.activationCountPerInvoker) - resAfterRemoval.get(firstEntry.invokerName.toString()) shouldBe Some(0) + val resAfterRemoval = lbd.activationCountPerInvoker + resAfterRemoval.get(firstEntry.invokerName.get.toString()) shouldBe Some(0) - await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 0 + lbd.activationCountOn(firstEntry.namespaceId) shouldBe 0 lbd.activationById(firstEntry.id) shouldBe None + + lbd.overflowActivationCount shouldBe 0 } } it should "remove activations from all 3 maps by activation id" in { - val distributedLoadBalancerData = new DistributedLoadBalancerData() + val distributedLoadBalancerData = new DistributedLoadBalancerData(instance) val localLoadBalancerData = new LocalLoadBalancerData() val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData) loadBalancerDataArray.map { lbd => lbd.putActivation(firstEntry.id, firstEntry) - val res = await(lbd.activationCountPerInvoker) - res.get(firstEntry.invokerName.toString()) shouldBe Some(1) + val res = lbd.activationCountPerInvoker + res.get(firstEntry.invokerName.get.toString()) shouldBe Some(1) lbd.removeActivation(firstEntry.id) - val resAfterRemoval = await(lbd.activationCountPerInvoker) - resAfterRemoval.get(firstEntry.invokerName.toString()) shouldBe Some(0) + val resAfterRemoval = lbd.activationCountPerInvoker + resAfterRemoval.get(firstEntry.invokerName.get.toString()) shouldBe Some(0) } } it should "return None if the entry doesn't exist when we remove it" in { - val distributedLoadBalancerData = new DistributedLoadBalancerData() + val distributedLoadBalancerData = new DistributedLoadBalancerData(instance) val localLoadBalancerData = new LocalLoadBalancerData() val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData) @@ -149,65 +187,79 @@ class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging { it should "respond with different values accordingly" in { - val entry = ActivationEntry(ActivationId(), UUID(), InstanceId(1), activationIdPromise) + val entry = ActivationEntry(ActivationId(), UUID(), Some(InstanceId(1)), activationIdPromise) val entrySameInvokerAndNamespace = entry.copy(id = ActivationId()) val entrySameInvoker = entry.copy(id = ActivationId(), namespaceId = UUID()) + val entryNoInvoker = entry.copy(id = ActivationId(), namespaceId = UUID(), invokerName = None) - val distributedLoadBalancerData = new DistributedLoadBalancerData() + val distributedLoadBalancerData = new DistributedLoadBalancerData(instance) val localLoadBalancerData = new LocalLoadBalancerData() val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData) loadBalancerDataArray.map { lbd => lbd.putActivation(entry.id, entry) - await(lbd.activationCountOn(entry.namespaceId)) shouldBe 1 - var res = await(lbd.activationCountPerInvoker) - res.get(entry.invokerName.toString()) shouldBe Some(1) + lbd.activationCountOn(entry.namespaceId) shouldBe 1 + var res = lbd.activationCountPerInvoker + res.get(entry.invokerName.get.toString()) shouldBe Some(1) lbd.putActivation(entrySameInvokerAndNamespace.id, entrySameInvokerAndNamespace) - await(lbd.activationCountOn(entry.namespaceId)) shouldBe 2 - res = await(lbd.activationCountPerInvoker) - res.get(entry.invokerName.toString()) shouldBe Some(2) + lbd.activationCountOn(entry.namespaceId) shouldBe 2 + res = lbd.activationCountPerInvoker + res.get(entry.invokerName.get.toString()) shouldBe Some(2) + + lbd.putActivation(entryNoInvoker.id, entryNoInvoker) + lbd.activationCountOn(entry.namespaceId) shouldBe 2 + res = lbd.activationCountPerInvoker + res.get(entry.invokerName.get.toString()) shouldBe Some(2) + lbd.overflowActivationCount shouldBe 1 lbd.putActivation(entrySameInvoker.id, entrySameInvoker) - await(lbd.activationCountOn(entry.namespaceId)) shouldBe 2 - res = await(lbd.activationCountPerInvoker) - res.get(entry.invokerName.toString()) shouldBe Some(3) + lbd.activationCountOn(entry.namespaceId) shouldBe 2 + res = lbd.activationCountPerInvoker + res.get(entry.invokerName.get.toString()) shouldBe Some(3) lbd.removeActivation(entrySameInvokerAndNamespace) - await(lbd.activationCountOn(entry.namespaceId)) shouldBe 1 - res = await(lbd.activationCountPerInvoker) - res.get(entry.invokerName.toString()) shouldBe Some(2) + lbd.activationCountOn(entry.namespaceId) shouldBe 1 + res = lbd.activationCountPerInvoker + res.get(entry.invokerName.get.toString()) shouldBe Some(2) + + lbd.removeActivation(entryNoInvoker) + lbd.activationCountOn(entry.namespaceId) shouldBe 1 + res = lbd.activationCountPerInvoker + res.get(entry.invokerName.get.toString()) shouldBe Some(2) + lbd.overflowActivationCount shouldBe 0 // removing non existing entry doesn't mess up lbd.removeActivation(entrySameInvokerAndNamespace) - await(lbd.activationCountOn(entry.namespaceId)) shouldBe 1 - res = await(lbd.activationCountPerInvoker) - res.get(entry.invokerName.toString()) shouldBe Some(2) + lbd.activationCountOn(entry.namespaceId) shouldBe 1 + res = lbd.activationCountPerInvoker + res.get(entry.invokerName.get.toString()) shouldBe Some(2) // clean up lbd.removeActivation(entry) lbd.removeActivation(entrySameInvoker.id) + lbd.removeActivation(entryNoInvoker) } } it should "not add the same entry more then once" in { - val distributedLoadBalancerData = new DistributedLoadBalancerData() + val distributedLoadBalancerData = new DistributedLoadBalancerData(instance) val localLoadBalancerData = new LocalLoadBalancerData() val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData) loadBalancerDataArray.map { lbd => lbd.putActivation(firstEntry.id, firstEntry) - val res = await(lbd.activationCountPerInvoker) - res.get(firstEntry.invokerName.toString()) shouldBe Some(1) - await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1 + val res = lbd.activationCountPerInvoker + res.get(firstEntry.invokerName.get.toString()) shouldBe Some(1) + lbd.activationCountOn(firstEntry.namespaceId) shouldBe 1 lbd.putActivation(firstEntry.id, firstEntry) - val resAfterAddingTheSameEntry = await(lbd.activationCountPerInvoker) - resAfterAddingTheSameEntry.get(firstEntry.invokerName.toString()) shouldBe Some(1) - await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1 + val resAfterAddingTheSameEntry = lbd.activationCountPerInvoker + resAfterAddingTheSameEntry.get(firstEntry.invokerName.get.toString()) shouldBe Some(1) + lbd.activationCountOn(firstEntry.namespaceId) shouldBe 1 lbd.removeActivation(firstEntry) lbd.removeActivation(firstEntry) @@ -217,7 +269,7 @@ class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging { it should "not evaluate the given block if an entry already exists" in { - val distributedLoadBalancerData = new DistributedLoadBalancerData() + val distributedLoadBalancerData = new DistributedLoadBalancerData(instance) val localLoadBalancerData = new LocalLoadBalancerData() val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData) @@ -248,7 +300,7 @@ class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging { it should "not evaluate the given block even if an entry is different (but has the same id)" in { - val distributedLoadBalancerData = new DistributedLoadBalancerData() + val distributedLoadBalancerData = new DistributedLoadBalancerData(instance) val localLoadBalancerData = new LocalLoadBalancerData() val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData) @@ -263,9 +315,9 @@ class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging { called shouldBe 1 - val res = await(lbd.activationCountPerInvoker) - res.get(firstEntry.invokerName.toString()) shouldBe Some(1) - await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1 + val res = lbd.activationCountPerInvoker + res.get(firstEntry.invokerName.get.toString()) shouldBe Some(1) + lbd.activationCountOn(firstEntry.namespaceId) shouldBe 1 // entry already exists, should not evaluate the block and change the state val entryAfterSecond = lbd.putActivation(entrySameId.id, { @@ -275,9 +327,9 @@ class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging { called shouldBe 1 entry shouldBe entryAfterSecond - val resAfterAddingTheSameEntry = await(lbd.activationCountPerInvoker) - resAfterAddingTheSameEntry.get(firstEntry.invokerName.toString()) shouldBe Some(1) - await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1 + val resAfterAddingTheSameEntry = lbd.activationCountPerInvoker + resAfterAddingTheSameEntry.get(firstEntry.invokerName.get.toString()) shouldBe Some(1) + lbd.activationCountOn(firstEntry.namespaceId) shouldBe 1 } } diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerServiceObjectTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerServiceObjectTests.scala index 3f3dca4794..fecc214bca 100644 --- a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerServiceObjectTests.scala +++ b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerServiceObjectTests.scala @@ -17,15 +17,17 @@ package whisk.core.loadBalancer.test +import common.StreamLogging import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner -import org.scalatest.FlatSpec +import org.scalatest.FlatSpecLike import org.scalatest.Matchers -import whisk.core.loadBalancer.LoadBalancerService +import org.scalatest.junit.JUnitRunner + +import whisk.core.entity.InstanceId import whisk.core.loadBalancer.Healthy +import whisk.core.loadBalancer.LoadBalancerService import whisk.core.loadBalancer.Offline import whisk.core.loadBalancer.UnHealthy -import whisk.core.entity.InstanceId /** * Unit tests for the ContainerPool object. @@ -34,7 +36,7 @@ import whisk.core.entity.InstanceId * of the ContainerPool object. */ @RunWith(classOf[JUnitRunner]) -class LoadBalancerServiceObjectTests extends FlatSpec with Matchers { +class LoadBalancerServiceObjectTests extends FlatSpecLike with Matchers with StreamLogging { behavior of "memoize" it should "not recompute a value which was already given" in { @@ -78,34 +80,46 @@ class LoadBalancerServiceObjectTests extends FlatSpec with Matchers { def hashInto[A](list: Seq[A], hash: Int) = list(hash % list.size) it should "return None on an empty invokers list" in { - LoadBalancerService.schedule(IndexedSeq(), 0, 1) shouldBe None + LoadBalancerService.schedule(IndexedSeq(), 1, 1, false) shouldBe None + LoadBalancerService.schedule(IndexedSeq(), 1, 1, true) shouldBe None } it should "return None on a list of offline/unhealthy invokers" in { val invs = IndexedSeq((InstanceId(0), Offline, 0), (InstanceId(1), UnHealthy, 0)) - LoadBalancerService.schedule(invs, 0, 1) shouldBe None + LoadBalancerService.schedule(invs, 1, 1, false) shouldBe None + LoadBalancerService.schedule(invs, 1, 1, true) shouldBe None } it should "schedule to the home invoker" in { val invs = invokers(10) val hash = 2 - LoadBalancerService.schedule(invs, 1, hash) shouldBe Some(InstanceId(hash % invs.size)) + LoadBalancerService.schedule(invs, 1, hash, false) shouldBe Some(InstanceId(hash % invs.size)) + LoadBalancerService.schedule(invs, 1, hash, true) shouldBe Some(InstanceId(hash % invs.size)) } it should "take the only online invoker" in { LoadBalancerService.schedule( IndexedSeq((InstanceId(0), Offline, 0), (InstanceId(1), UnHealthy, 0), (InstanceId(2), Healthy, 0)), - 0, - 1) shouldBe Some(InstanceId(2)) + 1, + 1, + false) shouldBe Some(InstanceId(2)) + + LoadBalancerService.schedule( + IndexedSeq((InstanceId(0), Offline, 0), (InstanceId(1), UnHealthy, 0), (InstanceId(2), Healthy, 0)), + 1, + 1, + true) shouldBe Some(InstanceId(2)) + } it should "skip an offline/unhealthy invoker, even if its underloaded" in { val hash = 0 - val invs = IndexedSeq((InstanceId(0), Healthy, 10), (InstanceId(1), UnHealthy, 0), (InstanceId(2), Healthy, 0)) + val invs = IndexedSeq((InstanceId(0), Healthy, 11), (InstanceId(1), UnHealthy, 0), (InstanceId(2), Healthy, 0)) - LoadBalancerService.schedule(invs, 10, hash) shouldBe Some(InstanceId(2)) + LoadBalancerService.schedule(invs, 10, hash, false) shouldBe Some(InstanceId(2)) + LoadBalancerService.schedule(invs, 10, hash, true) shouldBe Some(InstanceId(2)) } it should "jump to the next invoker determined by a hashed stepsize if the home invoker is overloaded" in { @@ -116,7 +130,8 @@ class LoadBalancerServiceObjectTests extends FlatSpec with Matchers { val invs = invokers(invokerCount).updated(targetInvoker, (InstanceId(targetInvoker), Healthy, 1)) val step = hashInto(LoadBalancerService.pairwiseCoprimeNumbersUntil(invokerCount), hash) - LoadBalancerService.schedule(invs, 1, hash) shouldBe Some(InstanceId((hash + step) % invs.size)) + LoadBalancerService.schedule(invs, 1, hash, true) shouldBe Some(InstanceId((hash + step) % invs.size)) + LoadBalancerService.schedule(invs, 1, hash, false) shouldBe Some(InstanceId((hash + step) % invs.size)) } it should "wrap the search at the end of the invoker list" in { @@ -130,7 +145,8 @@ class LoadBalancerServiceObjectTests extends FlatSpec with Matchers { // invoker1 is overloaded so it will step (2 steps) to the next one --> 1 2 0 --> invoker0 is next target // invoker0 is overloaded so it will step to the next one --> 0 1 2 --> invoker2 is next target and underloaded - LoadBalancerService.schedule(invs, 1, hash) shouldBe Some(InstanceId((hash + step + step) % invs.size)) + LoadBalancerService.schedule(invs, 1, hash, true) shouldBe Some(InstanceId((hash + step + step) % invs.size)) + LoadBalancerService.schedule(invs, 1, hash, false) shouldBe Some(InstanceId((hash + step + step) % invs.size)) } it should "multiply its threshold in 3 iterations to find an invoker with a good warm-chance" in { @@ -140,22 +156,30 @@ class LoadBalancerServiceObjectTests extends FlatSpec with Matchers { // even though invoker1 is not the home invoker in this case, it gets chosen over // the others because it's the first one encountered by the iteration mechanism to be below // the threshold of 3 * 16 invocations - LoadBalancerService.schedule(invs, 16, hash) shouldBe Some(InstanceId(0)) + LoadBalancerService.schedule(invs, 16, hash, true) shouldBe Some(InstanceId(0)) + // when not in overflow state, won't iterate and progress the busy threshold + LoadBalancerService.schedule(invs, 16, hash, false) shouldBe None } it should "choose the home invoker if all invokers are overloaded even above the muliplied threshold" in { val invs = IndexedSeq((InstanceId(0), Healthy, 51), (InstanceId(1), Healthy, 50), (InstanceId(2), Healthy, 49)) val hash = 0 // home is 0, stepsize is 1 - LoadBalancerService.schedule(invs, 16, hash) shouldBe Some(InstanceId(0)) + LoadBalancerService.schedule(invs, 16, hash, true) shouldBe Some(InstanceId(0)) + // when not in overflow state, won't iterate and progress the busy threshold + LoadBalancerService.schedule(invs, 16, hash, false) shouldBe None } it should "transparently work with partitioned sets of invokers" in { val invs = IndexedSeq((InstanceId(3), Healthy, 0), (InstanceId(4), Healthy, 0), (InstanceId(5), Healthy, 0)) - LoadBalancerService.schedule(invs, 1, 0) shouldBe Some(InstanceId(3)) - LoadBalancerService.schedule(invs, 1, 1) shouldBe Some(InstanceId(4)) - LoadBalancerService.schedule(invs, 1, 2) shouldBe Some(InstanceId(5)) - LoadBalancerService.schedule(invs, 1, 3) shouldBe Some(InstanceId(3)) + LoadBalancerService.schedule(invs, 1, 0, true) shouldBe Some(InstanceId(3)) + LoadBalancerService.schedule(invs, 1, 1, true) shouldBe Some(InstanceId(4)) + LoadBalancerService.schedule(invs, 1, 2, true) shouldBe Some(InstanceId(5)) + LoadBalancerService.schedule(invs, 1, 3, true) shouldBe Some(InstanceId(3)) + LoadBalancerService.schedule(invs, 1, 0, false) shouldBe Some(InstanceId(3)) + LoadBalancerService.schedule(invs, 1, 1, false) shouldBe Some(InstanceId(4)) + LoadBalancerService.schedule(invs, 1, 2, false) shouldBe Some(InstanceId(5)) + LoadBalancerService.schedule(invs, 1, 3, false) shouldBe Some(InstanceId(3)) } } diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/OverflowTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/OverflowTests.scala new file mode 100644 index 0000000000..eb9384361f --- /dev/null +++ b/tests/src/test/scala/whisk/core/loadBalancer/test/OverflowTests.scala @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.loadBalancer.test + +import akka.actor.Actor +import akka.actor.ActorSystem +import akka.actor.PoisonPill +import akka.actor.Props +import akka.cluster.Cluster +import akka.testkit.ImplicitSender +import akka.testkit.TestKit +import akka.testkit.TestProbe +import com.typesafe.config.ConfigFactory +import common.StreamLogging +import java.time.Instant +import org.scalamock.scalatest.MockFactory +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FlatSpecLike +import org.scalatest.Matchers +import org.scalatest.concurrent.Eventually._ +import org.scalatest.time.SpanSugar._ +import scala.collection.mutable.ListBuffer +import scala.collection.mutable +import scala.concurrent.Await +import scala.concurrent.Future +import spray.json.JsNumber +import spray.json.JsObject +import whisk.common.TransactionId +import whisk.core.WhiskConfig +import whisk.core.connector.ActivationMessage +import whisk.core.connector.CompletionMessage +import whisk.core.connector.MessagingProvider +import whisk.core.connector.test.TestConsumer +import whisk.core.loadBalancer.DistributedLoadBalancerData +import whisk.core.loadBalancer.StaticSeedNodesProvider +import whisk.core.loadBalancer.Updated +import whisk.core.connector.test.TestMessagingProvider +import whisk.core.entitlement.Privilege +import whisk.core.entity.ActivationId +import whisk.core.entity.ActivationResponse +import whisk.core.entity.AuthKey +import whisk.core.entity.CodeExecAsString +import whisk.core.entity.DocRevision +import whisk.core.entity.EntityName +import whisk.core.entity.EntityPath +import whisk.core.entity.ExecManifest +import whisk.core.entity.ExecManifest.ImageName +import whisk.core.entity.ExecManifest.RuntimeManifest +import whisk.core.entity.ExecutableWhiskAction +import whisk.core.entity.Identity +import whisk.core.entity.InstanceId +import whisk.core.entity.Secret +import whisk.core.entity.Subject +import whisk.core.entity.UUID +import whisk.core.entity.WhiskActivation +import whisk.core.entity.WhiskEntityStore +import whisk.core.loadBalancer.GetStatus +import whisk.core.loadBalancer.Healthy +import whisk.core.loadBalancer.LoadBalancerActorService +import whisk.core.loadBalancer.StatusUpdate +import whisk.core.loadBalancer.SubscribeLoadBalancer +import whisk.spi.SpiLoader +import whisk.spi.TypesafeConfigClassResolver + +object LoadBlanacerTestKitConfig { + val config = """ + akka.actor.provider = cluster + whisk.spi.MessagingProvider = whisk.core.connector.test.TestMessagingProvider + """ +} +class OverflowTests + extends TestKit(ActorSystem("ControllerCluster", ConfigFactory.parseString(LoadBlanacerTestKitConfig.config))) + with ImplicitSender + with FlatSpecLike + with Matchers + with BeforeAndAfterAll + with MockFactory + with StreamLogging { + + override def afterAll { + TestKit.shutdownActorSystem(system) + } + + def invokers(n: Int, messagingProvider: MessagingProvider, whiskConfig: WhiskConfig) = + (0 until n).map( + i => + ( + InstanceId(i), + Healthy, + messagingProvider + .getConsumer(whiskConfig, "invokers", s"invoker${i}", 10, maxPollInterval = 50.milliseconds))) + def activation(id: ActivationId) = + WhiskActivation( + namespace = EntityPath("ns"), + name = EntityName("a"), + Subject(), + activationId = id, + start = Instant.now(), + end = Instant.now(), + response = ActivationResponse.success(Some(JsObject("res" -> JsNumber(1)))), + duration = Some(123)) + + implicit val ec = system.dispatcher + + behavior of "overflow" + + //allow our config to feed the SpiLoader + TypesafeConfigClassResolver.config = system.settings.config + + val whiskConfig = new WhiskConfig( + WhiskEntityStore.requiredProperties ++ + ExecManifest.requiredProperties ++ + Map(WhiskConfig.loadbalancerInvokerBusyThreshold -> "1")) + + val invs = invokers(4, TestMessagingProvider, whiskConfig) + val poolActor = system.actorOf( + Props(new Actor { + override def receive = { + case SubscribeLoadBalancer(lbActor) => + lbActor ! StatusUpdate(invs.map(i => (i._1, i._2))) + case GetStatus => + sender() ! invs.map(i => (i._1, i._2)) + } + }), + "testpool") + val producer = TestMessagingProvider.getProducer(whiskConfig, system.dispatcher) + + ExecManifest.initialize(whiskConfig) + // handle on the entity datastore + val entityStore = WhiskEntityStore.datastore(whiskConfig) + val authKey = AuthKey(UUID(), Secret()) + + /** Specify how seed nodes are generated */ + val seedNodesProvider = new StaticSeedNodesProvider(whiskConfig.controllerSeedNodes, system.name) + Cluster(system).joinSeedNodes(seedNodesProvider.getSeedNodes()) + + it should "switch to overflow once capacity is exhausted, and switch back to underflow once capacity is available" in { + + val monitor = TestProbe() + val controllerInstance = 9 + val messagingProvider = SpiLoader.get[MessagingProvider] +// val pingConsumer = createPingConsumer(messagingProvider, InstanceId(controllerInstance)) +// val ackConsumer = createAckConsumer(messagingProvider, InstanceId(controllerInstance)) +// val overflowConsumer = createOverflowConsumer(messagingProvider, InstanceId(controllerInstance)) + + val instance = InstanceId(controllerInstance) + val lbData = new DistributedLoadBalancerData(instance, Some(testActor)) + val lb = new LoadBalancerActorService( + whiskConfig, + instance, + poolActor, +// pingConsumer, +// ackConsumer, +// overflowConsumer, + lbData) +// lb.updateInvokers(invs.map(i => (i._1, i._2))) + val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None) + val action = ExecutableWhiskAction(EntityPath("actionSpace"), EntityName("actionName"), exec) + + //verify underflow + // lb.overflowState.get() shouldBe false + //TODO: use default (16) for threshold + val futures = ListBuffer[Future[Any]]() + + //there is 1 invoker reserved for blackbox currently: + val numInvokers = invs.size - 1 + val completions = ListBuffer[CompletionMessage](); + (1 to numInvokers).foreach(i => { +// futures += lb.publish(action, createActivation(action, idGen.make(), TransactionId(i)))(TransactionId(i)) + + val id = idGen.make() + //activations += id + futures += lb.publish(action, createActivation(action, id, TransactionId(i)))(TransactionId(i)) + completions += CompletionMessage(TransactionId.testing, Right(activation(id)), invs(i - 1)._1) + }) + //wait for queueing + eventually(timeout(5000 millis), interval(50 millis)) { + TestMessagingProvider.occupancy("invoker0") shouldBe 1 + TestMessagingProvider.occupancy("invoker1") shouldBe 1 + TestMessagingProvider.occupancy("invoker2") shouldBe 1 + } + //wait for replication +// eventually(timeout(5000 millis), interval(50 millis)) { + expectMsg(Updated("Namespaces", Map(authKey.uuid.toString -> Map(9 -> 3)))) + expectMsg( + Updated( + "Invokers", + Map("InstanceId(1)" -> Map(9 -> 1), "InstanceId(2)" -> Map(9 -> 1), "InstanceId(0)" -> Map(9 -> 1)))) + // } + monitor.expectNoMsg() + //disable reading from overflow + lb.overflowConsumer.asInstanceOf[TestConsumer].dontPeek = true +// TestMessagingProvider.paused += "overflow" + + //1 more message will overflow + val id = idGen.make() + futures += lb.publish(action, createActivation(action, id, TransactionId(100)))(TransactionId(100)) + + //monitor.expectMsg(Overflow) + //wait for replication +// expectMsg(Updated("Overflow", Map("overflow" -> Map(8 -> 1)))) +// expectMsg(Updated("Namespaces", Map(authKey.uuid.toString -> Map(8 -> 4)))) + + //process messages + (1 to invs.size - 1).foreach(index => { + val i = invs(index - 1) + val msgs = i._3.peek(100.milliseconds) + //println(s"found ${msgs.size} in invoker${i._1}") + i._3.commit() + }) + + eventually(timeout(5000 millis), interval(50 millis)) { + TestMessagingProvider.occupancy("overflow") shouldBe 1 + TestMessagingProvider.occupancy("invoker0") shouldBe 0 + TestMessagingProvider.occupancy("invoker1") shouldBe 0 + TestMessagingProvider.occupancy("invoker2") shouldBe 0 + } + //reenable the overflow processing + lb.overflowConsumer.asInstanceOf[TestConsumer].dontPeek = false +// TestMessagingProvider.paused -= "overflow" + //send 1 completion + //val id = activations.remove(0).activationId + //val completion = CompletionMessage(TransactionId.testing, Right(activation(id)), InstanceId(0)) +// val completion = CompletionMessage(TransactionId.testing, Right(activation(id)), InstanceId(0)) +// +// producer.send(s"completed${controllerInstance}", completion) + + //verify underflow + //monitor.expectMsg(Underflow) + + //complete the other activations +// val lastActivation = activations.size +// (1 to lastActivation).foreach(i => { +// val id = activations.remove(0).activationId +// val completion = CompletionMessage(TransactionId.testing, Right(activation(id)), InstanceId(i)) +// producer.send(s"completed${controllerInstance}", completion) +// +// }) + completions.foreach(producer.send(s"completed${controllerInstance}", _)) + + eventually(timeout(5000 millis), interval(50 millis)) { + TestMessagingProvider.occupancy("invoker1") shouldBe 1 + + } + + val msgs = invs(1)._3.peek(100.milliseconds) + //println(s"found ${msgs.size} in invoker${invs(1)._1}") + +// expectMsg(Updated("Namespaces", Map(authKey.uuid.toString -> Map(8 -> 4)))) +// expectMsg( +// Updated( +// "Invokers", +// Map("InstanceId(0)" -> Map(8 -> 0), "InstanceId(1)" -> Map(8 -> 1), "InstanceId(2)" -> Map(8 -> 0)))) + + //invs(1)._3.commit() + +// //wait for replication +// eventually(timeout(5000 millis), interval(50 millis)) { +// expectMsg(Updated("Namespaces", Map(authKey.uuid.toString -> Map(8 -> 1)))) +// expectMsg( +// Updated( +// "Invokers", +// Map("InstanceId(0)" -> Map(8 -> 0), "InstanceId(1)" -> Map(8 -> 1), "InstanceId(2)" -> Map(8 -> 0)))) +// } + + eventually(timeout(5000 millis), interval(50 millis)) { + expectMsg(Updated("Overflow", Map("overflow" -> Map(9 -> 0)))) + } + eventually(timeout(5000 millis), interval(50 millis)) { + //expectMsg(Updated("Overflow", Map("overflow" -> Map(8 -> 0)))) + TestMessagingProvider.occupancy("invoker0") shouldBe 0 + TestMessagingProvider.occupancy("invoker1") shouldBe 0 + TestMessagingProvider.occupancy("invoker2") shouldBe 0 + TestMessagingProvider.occupancy("overflow") shouldBe 0 + + } + + //println(s"sending ${completions.size} completions") + completions.foreach(producer.send(s"completed${controllerInstance}", _)) + //println("waiting for completed9 to drain...") + eventually(timeout(5000 millis), interval(50 millis)) { + TestMessagingProvider.occupancy(s"completed${controllerInstance}") shouldBe 0 + } + //println("drain completed...") + Await.ready(Future.sequence(futures), 5.seconds) + + //send completion for the overflow msg + + producer.send( + s"completed${controllerInstance}", + CompletionMessage(TransactionId.testing, Right(activation(id)), invs(1)._1)) + + //wait for replication + eventually(timeout(5000 millis), interval(50 millis)) { + expectMsg(Updated("Namespaces", Map(authKey.uuid.toString -> Map(9 -> 0)))) + //expectMsg(Updated("Overflow", Map("overflow" -> Map(8 -> 0)))) + expectMsg( + Updated( + "Invokers", + Map("InstanceId(0)" -> Map(9 -> 0), "InstanceId(1)" -> Map(9 -> 0), "InstanceId(2)" -> Map(9 -> 0)))) + } + +// lb.overflowConsumer.close() +// lb.activeAckConsumer.close() + + lb.lbActor ! PoisonPill + } + + it should "allow controller1 to process requests from controller0 when controller0 goes into overflow" in { + + val monitor = TestProbe() + val messagingProvider = SpiLoader.get[MessagingProvider] + //configure lb1 to NOT process overflow messages + val instance1 = new InstanceId(9) + val instance2 = InstanceId(10) +// val pingConsumer = createPingConsumer(messagingProvider, instance1) + val maxPingsPerPoll = 54 + + val maxActiveAcksPerPoll = 54 + val maxOverflowPerPoll = 5 + val overflowCapacity = Some(5) + +// val activeAckConsumer1 = createAckConsumer(messagingProvider, instance1) +// val activeAckConsumer2 = createAckConsumer(messagingProvider, instance2) +// val overflowConsumer1 = createOverflowConsumer(messagingProvider, instance1) +// val overflowConsumer2 = createOverflowConsumer(messagingProvider, instance2) + + //start with nothing queued + TestMessagingProvider.occupancy("invoker0") shouldBe 0 + TestMessagingProvider.occupancy("invoker1") shouldBe 0 + TestMessagingProvider.occupancy("invoker2") shouldBe 0 + + //disable reading from overflow on lb1 +// lb1.overflowConsumer.asInstanceOf[TestConsumer].dontPeek = true +// TestMessagingProvider.paused += "overflow" + + val lbData1 = new DistributedLoadBalancerData(instance1, Some(testActor)) + val lbData2 = new DistributedLoadBalancerData(instance2, Some(testActor)) + val lb1 = + new LoadBalancerActorService(whiskConfig, InstanceId(9), poolActor, lbData1) + lb1.overflowConsumer.asInstanceOf[TestConsumer].dontPeek = true + lb1.activeAckConsumer.asInstanceOf[TestConsumer].dontPeek = true + TestMessagingProvider.occupancy("overflow") shouldBe 0 + + val lb2 = + new LoadBalancerActorService(whiskConfig, InstanceId(10), poolActor, lbData2) +// lb1.updateInvokers(invs.map(i => (i._1, i._2))) +// lb2.updateInvokers(invs.map(i => (i._1, i._2))) + val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None) + val action = ExecutableWhiskAction(EntityPath("actionSpace"), EntityName("actionName"), exec) + + //verify underflow + // lb.overflowState.get() shouldBe false + //TODO: use default (16) for threshold + val futures = ListBuffer[Future[Any]]() + + //there is 1 invoker reserved for blackbox currently: + val numInvokers = invs.size - 1 + //send 1 activation per invoker + val activations = ListBuffer[ActivationId](); + val completions = ListBuffer[CompletionMessage](); + (1 to numInvokers).foreach(i => { + val id = idGen.make() + activations += id + futures += lb1.publish(action, createActivation(action, id, TransactionId(i)))(TransactionId(i)) + completions += CompletionMessage(TransactionId.testing, Right(activation(id)), invs(i - 1)._1) + }) + eventually(timeout(5000 millis), interval(50 millis)) { + expectMsg(Updated("Namespaces", Map(authKey.uuid.toString -> Map(9 -> 3)))) +// expectMsg( +// Updated( +// "Invokers", +// Map("InstanceId(0)" -> Map(9 -> 1), "InstanceId(1)" -> Map(9 -> 1), "InstanceId(2)" -> Map(9 -> 1)))) + + } + // + + //wait for queueing + eventually(timeout(5000 millis), interval(50 millis)) { + TestMessagingProvider.occupancy("invoker0") shouldBe 1 + TestMessagingProvider.occupancy("invoker1") shouldBe 1 + TestMessagingProvider.occupancy("invoker2") shouldBe 1 + } + + //TestMessagingProvider.consumers("overflow").dontPeek = true + //overflowConsumer1.asInstanceOf[TestConsumer].dontPeek = true + + //publish one more to cause overflow + val overflowedActivation = createActivation(action, idGen.make(), TransactionId(4)) + futures += lb1.publish(action, overflowedActivation)(TransactionId(4)) + + //wait for overflow queueing + //monitor.expectMsg(Overflow) + + eventually(timeout(5000 millis), interval(50 millis)) { + expectMsg(Updated("Namespaces", Map(authKey.uuid.toString -> Map(9 -> 4)))) + + } + + //wait for overflow draining + //make sure it was sent to invoker1 + eventually(timeout(15000 millis), interval(50 millis)) { + TestMessagingProvider.occupancy("invoker1") shouldBe 2 + + } + //disable processing of acks in both lbs, to verify queue routing + lb1.activeAckConsumer.asInstanceOf[TestConsumer].dontPeek = true + lb2.activeAckConsumer.asInstanceOf[TestConsumer].dontPeek = true +// TestMessagingProvider.paused += "completed9" +// TestMessagingProvider.paused += "completed10" + + //emulate completion in invoker TODO: create an invoker emulator + val id = overflowedActivation.activationId + val completion = CompletionMessage(TransactionId.testing, Right(activation(id)), InstanceId(1)) + producer.send(s"completed10", completion) + //we should first get a completion for lb2 (where overflow was processed) + //TestMessagingProvider.occupancy("completed10") shouldBe 1 + + //we should first get a completion for lb2 (where overflow was processed) + eventually(timeout(5000 millis), interval(50 millis)) { +// activeAckConsumer2.asInstanceOf[TestConsumer].offset shouldBe 1 + TestMessagingProvider.occupancy("completed10") shouldBe 1 + } + + //disable lb1 ack processing + //println("disabling peek on lb1") + //activeAckConsumer1.asInstanceOf[TestConsumer].dontPeek = true + //TestMessagingProvider.paused -= "completed9" + + //reenable lb2 ack processing + lb2.activeAckConsumer.asInstanceOf[TestConsumer].dontPeek = false +// TestMessagingProvider.paused -= "completed10" + + //println("processing ack2 again") + //we should then get a completion for lb1 (where initial publish occurred) + eventually(timeout(5000 millis), interval(50 millis)) { +// activeAckConsumer1.asInstanceOf[TestConsumer].offset shouldBe 1 + TestMessagingProvider.occupancy("completed9") shouldBe 1 + } + + //renable lb1 ack processing + lb1.activeAckConsumer.asInstanceOf[TestConsumer].dontPeek = false +// TestMessagingProvider.paused -= "completed9" + + //monitor.expectMsg(Underflow) + //println(s"waiting for ${futures.size} futures") + + completions.foreach(producer.send(s"completed9", _)) + + Await.ready(Future.sequence(futures), 5.seconds) + +// lb1.activeAckConsumer.close() +// lb2.activeAckConsumer.close() +// lb1.overflowConsumer.close() +// lb2.overflowConsumer.close() + lb1.lbActor ! PoisonPill + lb2.lbActor ! PoisonPill + + invs.foreach(i => { + val msgs = i._3.peek(100.milliseconds) + //println(s"found ${msgs.size} in invoker${i._1}") + }) + //end with nothing queued + eventually(timeout(5000 millis), interval(50 millis)) { + TestMessagingProvider.occupancy("invoker0") shouldBe 0 + TestMessagingProvider.occupancy("invoker1") shouldBe 0 + TestMessagingProvider.occupancy("invoker2") shouldBe 0 + TestMessagingProvider.occupancy("overflow") shouldBe 0 + } + } + it should "rely on replicated data to decide" in { + + val monitor = TestProbe() + val messagingProvider = SpiLoader.get[MessagingProvider] + //configure lb1 to NOT process overflow messages + val instance1 = InstanceId(9) + val instance2 = InstanceId(10) + val pingConsumer = createPingConsumer(messagingProvider, instance1) + val maxPingsPerPoll = 54 + + val maxActiveAcksPerPoll = 54 + val maxOverflowPerPoll = 5 + val overflowCapacity = Some(5) + + val activeAckConsumer1 = createAckConsumer(messagingProvider, instance1) + val activeAckConsumer2 = createAckConsumer(messagingProvider, instance2) + val overflowConsumer1 = createOverflowConsumer(messagingProvider, instance1) + val overflowConsumer2 = createOverflowConsumer(messagingProvider, instance2) + + val lbData1 = new DistributedLoadBalancerData(instance1, Some(testActor)) + val lbData2 = new DistributedLoadBalancerData(instance2, Some(testActor)) + + //start with nothing queued + TestMessagingProvider.occupancy("invoker0") shouldBe 0 + TestMessagingProvider.occupancy("invoker1") shouldBe 0 + TestMessagingProvider.occupancy("invoker2") shouldBe 0 + TestMessagingProvider.occupancy("overflow") shouldBe 0 + + //disable reading from overflow on lb1 + overflowConsumer1.asInstanceOf[TestConsumer].dontPeek = true + + val lb1 = + new LoadBalancerActorService( + whiskConfig, + instance1, +// entityStore, + poolActor, +// pingConsumer, +// activeAckConsumer1, +// overflowConsumer1, + lbData1) + val lb2 = new LoadBalancerActorService( + whiskConfig, + instance2, +// entityStore, + poolActor, +// pingConsumer, +// activeAckConsumer2, +// overflowConsumer2, + lbData2) +// lb1.updateInvokers(invs.map(i => (i._1, i._2))) +// lb2.updateInvokers(invs.map(i => (i._1, i._2))) + + val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None) + val action = ExecutableWhiskAction(EntityPath("actionSpace"), EntityName("actionName"), exec) + + //verify underflow + // lb.overflowState.get() shouldBe false + //TODO: use default (16) for threshold + val futures = ListBuffer[Future[Any]]() + + //there is 1 invoker reserved for blackbox currently: + val numInvokers = invs.size - 1 + //send 1 activation per invoker + val activations = ListBuffer[ActivationId](); + val completions = ListBuffer[CompletionMessage](); + (1 to numInvokers - 1).foreach(i => { + val id = idGen.make() + activations += id + futures += lb1.publish(action, createActivation(action, id, TransactionId(i)))(TransactionId(i)) + completions += CompletionMessage(TransactionId.testing, Right(activation(id)), invs(i - 1)._1) + }) + + //DO NOT WAIT FOR REPLICATION HERE + + //wait for queueing - the action hash will cause scheduling to start with invoker1 + eventually(timeout(5000 millis), interval(50 millis)) { + TestMessagingProvider.occupancy("invoker0") shouldBe 0 + TestMessagingProvider.occupancy("invoker1") shouldBe 1 + TestMessagingProvider.occupancy("invoker2") shouldBe 1 + } + (1 to 1).foreach(i => { + val id = idGen.make() + activations += id + futures += lb2.publish(action, createActivation(action, id, TransactionId(i)))(TransactionId(i)) + completions += CompletionMessage(TransactionId.testing, Right(activation(id)), invs(i - 1)._1) + }) + + //before replication, the scheduling will still start at invoker1 + eventually(timeout(5000 millis), interval(50 millis)) { + TestMessagingProvider.occupancy("invoker0") shouldBe 0 + TestMessagingProvider.occupancy("invoker1") shouldBe 2 + TestMessagingProvider.occupancy("invoker2") shouldBe 1 + } + //wait for replication + //expectMsgClass(classOf[Updated]) + + eventually(timeout(5000 millis), interval(50 millis)) { + //without shared replicator: + expectMsg(Updated("Namespaces", Map(authKey.uuid.toString -> Map(9 -> 2, 10 -> 1)))) + expectMsg( + Updated( + "Invokers", + Map("InstanceId(1)" -> Map(9 -> 1, 10 -> 1), "InstanceId(2)" -> Map(9 -> 1), "InstanceId(0)" -> Map(9 -> 0)))) + //with shared replicator: + + } + + (1 to 1).foreach(i => { + val id = idGen.make() + activations += id + futures += lb2.publish(action, createActivation(action, id, TransactionId(i)))(TransactionId(i)) + completions += CompletionMessage(TransactionId.testing, Right(activation(id)), invs(i - 1)._1) + }) + + //after replication, the scheduling will still start at invoker0 + eventually(timeout(5000 millis), interval(50 millis)) { + TestMessagingProvider.occupancy("invoker0") shouldBe 1 + TestMessagingProvider.occupancy("invoker1") shouldBe 2 + TestMessagingProvider.occupancy("invoker2") shouldBe 1 + } + + lb1.lbActor ! PoisonPill + lb2.lbActor ! PoisonPill + + } + + val idGen = new ActivationId.ActivationIdGenerator {} + + val activations = mutable.ListBuffer[ActivationMessage]() + + def createActivation(action: ExecutableWhiskAction, id: ActivationId, transid: TransactionId) = { + val a = ActivationMessage( + transid = transid, + action = action.fullyQualifiedName(true), + revision = DocRevision.empty, + user = Identity(Subject("unhealthyInvokerCheck"), EntityName("unhealthyInvokerCheck"), authKey, Set[Privilege]()), + activationId = id, + activationNamespace = EntityPath("guest"), + rootControllerIndex = InstanceId(0), + blocking = false, + content = None) + activations += a + a + } + + def createPingConsumer(messagingProvider: MessagingProvider, instanceId: InstanceId) = + messagingProvider.getConsumer( + whiskConfig, + s"health${instanceId.toInt}", + "health", + maxPeek = 128, + maxPollInterval = 200.millis) + def createAckConsumer(messagingProvider: MessagingProvider, instanceId: InstanceId) = + messagingProvider.getConsumer( + whiskConfig, + "completions", + s"completed${instanceId.toInt}", + maxPeek = 128, + maxPollInterval = 200.millis) + def createOverflowConsumer(messagingProvider: MessagingProvider, instanceId: InstanceId) = + messagingProvider.getConsumer(whiskConfig, "overflow", s"overflow", maxPeek = 1, maxPollInterval = 200.millis) +} diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/SharedDataServiceTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/SharedDataServiceTests.scala index 3961e539ca..0774fb3ef9 100644 --- a/tests/src/test/scala/whisk/core/loadBalancer/test/SharedDataServiceTests.scala +++ b/tests/src/test/scala/whisk/core/loadBalancer/test/SharedDataServiceTests.scala @@ -25,8 +25,8 @@ import com.typesafe.config.ConfigFactory import org.scalatest._ import whisk.core.loadBalancer._ import org.scalatest.FlatSpecLike - import scala.concurrent.duration._ +import whisk.core.entity.InstanceId // Define your test specific configuration here @@ -61,7 +61,9 @@ class SharedDataServiceTests() .withFallback(ConfigFactory.load()) val s = ActorSystem("controller-actor-system", config) - val sharedDataService = s.actorOf(SharedDataService.props("Candidates"), name = "busyMan") + val storageName = "Candidates" + val instance = InstanceId(123) + val sharedDataService = s.actorOf(SharedDataService.props(storageName, testActor), name = "busyMan") implicit val timeout = Timeout(5.seconds) it should "retrieve an empty map after initialization" in { @@ -70,22 +72,37 @@ class SharedDataServiceTests() expectMsg(msg) } it should "increase the counter" in { - sharedDataService ! (IncreaseCounter("Donald", 1)) + sharedDataService ! (IncreaseCounter("Donald", instance, 1)) + val msg = Map("Donald" -> Map(instance.toInt -> 1)) + expectMsg(Updated(storageName, msg)) sharedDataService ! GetMap - val msg = Map("Donald" -> 1) expectMsg(msg) } it should "decrease the counter" in { - sharedDataService ! (IncreaseCounter("Donald", 2)) - sharedDataService ! (DecreaseCounter("Donald", 2)) + //verify starting at 1 sharedDataService ! GetMap - val msg = Map("Donald" -> 1) + val msg = Map("Donald" -> Map(instance.toInt -> 1)) expectMsg(msg) + + //increase and verify change + sharedDataService ! (IncreaseCounter("Donald", instance, 2)) + val msg2 = Map("Donald" -> Map(instance.toInt -> 3)) + expectMsg(Updated(storageName, msg2)) + sharedDataService ! GetMap + expectMsg(msg2) + + //decrease and verify change + sharedDataService ! (DecreaseCounter("Donald", instance, 2)) + val msg3 = Map("Donald" -> Map(instance.toInt -> 1)) + expectMsg(Updated(storageName, msg3)) + sharedDataService ! GetMap + expectMsg(msg3) } it should "receive the map with all counters" in { - sharedDataService ! (IncreaseCounter("Hilary", 1)) + sharedDataService ! (IncreaseCounter("Hilary", instance, 1)) + val msg = Map("Hilary" -> Map(instance.toInt -> 1), "Donald" -> Map(instance.toInt -> 1)) + expectMsg(Updated(storageName, msg)) sharedDataService ! GetMap - val msg = Map("Hilary" -> 1, "Donald" -> 1) expectMsg(msg) } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services