This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 0f33993 Throttle message bus consumption. (#2425) 0f33993 is described below commit 0f3399372c6d7f047983707d8249707668daa746 Author: rodric rabbah <rod...@gmail.com> AuthorDate: Thu Jul 13 06:47:49 2017 -0700 Throttle message bus consumption. (#2425) Replaces unthrottled Kafka consumer with FSM actor that can throttle the consumption. Applied for active active acks and invoker health pings. Implement capacity based sends. --- .../connector/kafka/KafkaConsumerConnector.scala | 29 ---- .../main/scala/whisk/core/connector/Message.scala | 2 +- .../whisk/core/connector/MessageConsumer.scala | 192 ++++++++++++++++++++- .../core/loadBalancer/InvokerSupervision.scala | 36 ++-- .../core/loadBalancer/LoadBalancerService.scala | 37 +++- .../whisk/core/containerpool/ContainerPool.scala | 5 +- .../whisk/core/dispatcher/ActivationFeed.scala | 122 ------------- .../scala/whisk/core/dispatcher/Dispatcher.scala | 29 ++-- .../main/scala/whisk/core/invoker/Invoker.scala | 20 +-- .../scala/whisk/core/invoker/InvokerReactive.scala | 5 +- docs/actions.md | 7 +- .../test/scala/services/KafkaConnectorTests.scala | 54 +----- .../core/connector/test/MessageFeedTests.scala | 191 ++++++++++++++++++++ .../test/TestConnector.scala | 36 ++-- .../containerpool/test/ContainerPoolTests.scala | 14 +- .../core/controller/test/PackagesApiTests.scala | 1 - .../core/dispatcher/test/DispatcherTests.scala | 56 +++--- .../test/InvokerSupervisionTests.scala | 9 +- 18 files changed, 508 insertions(+), 337 deletions(-) diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala index 1042030..a138781 100644 --- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala +++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala @@ -24,9 +24,7 @@ import scala.collection.JavaConversions.seqAsJavaList import scala.concurrent.duration.Duration import scala.concurrent.duration.DurationInt import scala.concurrent.duration.FiniteDuration -import scala.util.Try -import org.apache.kafka.clients.consumer.CommitFailedException import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.serialization.ByteArrayDeserializer @@ -62,34 +60,8 @@ class KafkaConsumerConnector( */ def commit() = consumer.commitSync() - override def onMessage(process: (String, Int, Long, Array[Byte]) => Unit) = { - val self = this - val thread = new Thread() { - override def run() = { - while (!disconnect) { - Try { - // Grab next batch of messages and commit offsets immediately - // It won't be processed twice (tested in "KafkaConnectorTests") - val messages = peek() - commit() - messages - } map { - _.foreach { process.tupled(_) } - } recover { - case e: CommitFailedException => logging.error(self, s"failed to commit to kafka: ${e.getMessage}") - case e: Throwable => logging.error(self, s"exception while pulling new records: ${e.getMessage}") - } - } - logging.warn(self, "consumer stream terminated") - consumer.close() - } - } - thread.start() - } - override def close() = { logging.info(this, s"closing '$topic' consumer") - disconnect = true } private def getProps: Properties = { @@ -122,5 +94,4 @@ class KafkaConsumerConnector( } private val consumer = getConsumer(getProps, Some(List(topic))) - private var disconnect = false } diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala b/common/scala/src/main/scala/whisk/core/connector/Message.scala index 3491c21..82c797d 100644 --- a/common/scala/src/main/scala/whisk/core/connector/Message.scala +++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala @@ -32,7 +32,7 @@ import whisk.core.entity.WhiskActivation /** Basic trait for messages that are sent on a message bus connector. */ trait Message { /** - * A transaction id to attach to the message. If not defined, defaults to 'dontcare' value. + * A transaction id to attach to the message. */ val transid = TransactionId.unknown diff --git a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala index 359cddb..87353df 100644 --- a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala +++ b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala @@ -17,7 +17,18 @@ package whisk.core.connector -import scala.concurrent.duration.Duration +import scala.annotation.tailrec +import scala.collection.mutable +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.Failure + +import org.apache.kafka.clients.consumer.CommitFailedException + +import akka.actor.FSM +import akka.pattern.pipe +import whisk.common.Logging +import whisk.common.TransactionId trait MessageConsumer { @@ -38,15 +49,180 @@ trait MessageConsumer { * Commits offsets from last peek operation to ensure they are removed * from the connector. */ - def commit() - - /** - * Calls process for every message received. Process receives a tuple - * (topic, partition, offset, and message as byte array). - */ - def onMessage(process: (String, Int, Long, Array[Byte]) => Unit): Unit + def commit(): Unit /** Closes consumer. */ def close(): Unit } + +object MessageFeed { + protected sealed trait FeedState + protected[connector] case object Idle extends FeedState + protected[connector] case object FillingPipeline extends FeedState + protected[connector] case object DrainingPipeline extends FeedState + + protected sealed trait FeedData + private case object NoData extends FeedData + + /** Indicates the consumer is ready to accept messages from the message bus for processing. */ + object Ready + + /** Steady state message, indicates capacity in downstream process to receive more messages. */ + object Processed + + /** Indicates the fill operation has completed. */ + private case class FillCompleted(messages: Seq[(String, Int, Long, Array[Byte])]) +} + +/** + * This actor polls the message bus for new messages and dispatches them to the given + * handler. The actor tracks the number of messages dispatched and will not dispatch new + * messages until some number of them are acknowledged. + * + * This is used by the invoker to pull messages from the message bus and apply back pressure + * when the invoker does not have resources to complete processing messages (i.e., no containers + * are available to run new actions). It is also used in the load balancer to consume active + * ack messages. + * When the invoker releases resources (by reclaiming containers) it will send a message + * to this actor which will then attempt to fill the pipeline with new messages. + * + * The actor tries to fill the pipeline with additional messages while the number + * of outstanding requests is below the pipeline fill threshold. + */ +@throws[IllegalArgumentException] +class MessageFeed( + description: String, + logging: Logging, + consumer: MessageConsumer, + maximumHandlerCapacity: Int, + longPollDuration: FiniteDuration, + handler: Array[Byte] => Future[Unit], + autoStart: Boolean = true, + logHandoff: Boolean = true) + extends FSM[MessageFeed.FeedState, MessageFeed.FeedData] { + import MessageFeed._ + + // double-buffer to make up for message bus read overhead + val maxPipelineDepth = maximumHandlerCapacity * 2 + private val pipelineFillThreshold = maxPipelineDepth - consumer.maxPeek + + require(consumer.maxPeek <= maxPipelineDepth, "consumer may not yield more messages per peek than permitted by max depth") + + private val outstandingMessages = mutable.Queue[(String, Int, Long, Array[Byte])]() + private var handlerCapacity = maximumHandlerCapacity + + private implicit val tid = TransactionId.dispatcher + + logging.info(this, s"handler capacity = $maximumHandlerCapacity, pipeline fill at = $pipelineFillThreshold, pipeline depth = $maxPipelineDepth") + + when(Idle) { + case Event(Ready, _) => + fillPipeline() + goto(FillingPipeline) + + case _ => stay + } + + // wait for fill to complete, and keep filling if there is + // capacity otherwise wait to drain + when(FillingPipeline) { + case Event(Processed, _) => + updateHandlerCapacity() + sendOutstandingMessages() + stay + + case Event(FillCompleted(messages), _) => + outstandingMessages.enqueue(messages: _*) + sendOutstandingMessages() + + if (shouldFillQueue()) { + fillPipeline() + stay + } else { + goto(DrainingPipeline) + } + + case _ => stay + } + + when(DrainingPipeline) { + case Event(Processed, _) => + updateHandlerCapacity() + sendOutstandingMessages() + if (shouldFillQueue()) { + fillPipeline() + goto(FillingPipeline) + } else stay + + case _ => stay + } + + onTransition { case _ -> Idle => if (autoStart) self ! Ready } + startWith(Idle, MessageFeed.NoData) + initialize() + + private implicit val ec = context.system.dispatcher + + private def fillPipeline(): Unit = { + if (outstandingMessages.size <= pipelineFillThreshold) { + Future { + // Grab next batch of messages and commit offsets immediately + // essentially marking the activation as having satisfied "at most once" + // semantics (this is the point at which the activation is considered started). + // If the commit fails, then messages peeked are peeked again on the next poll. + // While the commit is synchronous and will block until it completes, at steady + // state with enough buffering (i.e., maxPipelineDepth > maxPeek), the latency + // of the commit should be masked. + val records = consumer.peek(longPollDuration) + consumer.commit() + FillCompleted(records.toSeq) + }.andThen { + case Failure(e: CommitFailedException) => logging.error(this, s"failed to commit $description consumer offset: $e") + case Failure(e: Throwable) => logging.error(this, s"exception while pulling new $description records: $e") + }.recover { + case _ => FillCompleted(Seq.empty) + }.pipeTo(self) + } else { + logging.error(this, s"dropping fill request until $description feed is drained") + } + } + + /** Send as many messages as possible to the handler. */ + @tailrec + private def sendOutstandingMessages(): Unit = { + val occupancy = outstandingMessages.size + if (occupancy > 0 && handlerCapacity > 0) { + val (topic, partition, offset, bytes) = outstandingMessages.dequeue() + + if (logHandoff) logging.info(this, s"processing $topic[$partition][$offset] ($occupancy/$handlerCapacity)") + handler(bytes) + handlerCapacity -= 1 + + sendOutstandingMessages() + } + } + + private def shouldFillQueue(): Boolean = { + val occupancy = outstandingMessages.size + if (occupancy <= pipelineFillThreshold) { + logging.debug(this, s"$description pipeline has capacity: $occupancy <= $pipelineFillThreshold ($handlerCapacity)") + true + } else { + logging.debug(this, s"$description pipeline must drain: $occupancy > $pipelineFillThreshold") + false + } + } + + private def updateHandlerCapacity(): Int = { + logging.debug(self, s"$description received processed msg, current capacity = $handlerCapacity") + + if (handlerCapacity < maximumHandlerCapacity) { + handlerCapacity += 1 + handlerCapacity + } else { + if (handlerCapacity > maximumHandlerCapacity) logging.error(self, s"$description capacity already at max") + maximumHandlerCapacity + } + } +} diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala index d7e9025..79eb76b 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala @@ -37,30 +37,20 @@ import akka.actor.FSM.Transition import akka.actor.Props import akka.pattern.pipe import akka.util.Timeout + import spray.json._ import spray.json.DefaultJsonProtocol._ + import whisk.common.AkkaLogging import whisk.common.ConsulKV.LoadBalancerKeys import whisk.common.KeyValueStore import whisk.common.LoggingMarkers import whisk.common.RingBuffer import whisk.common.TransactionId -import whisk.core.connector.ActivationMessage -import whisk.core.connector.MessageConsumer -import whisk.core.connector.PingMessage +import whisk.core.connector._ import whisk.core.entitlement.Privilege.Privilege import whisk.core.entity.ActivationId.ActivationIdGenerator -import whisk.core.entity.AuthKey -import whisk.core.entity.CodeExecAsString -import whisk.core.entity.DocRevision -import whisk.core.entity.EntityName -import whisk.core.entity.ExecManifest -import whisk.core.entity.Identity -import whisk.core.entity.InstanceId -import whisk.core.entity.Secret -import whisk.core.entity.Subject -import whisk.core.entity.UUID -import whisk.core.entity.WhiskAction +import whisk.core.entity._ // Received events case object GetStatus @@ -151,13 +141,23 @@ class InvokerPool( } /** Receive Ping messages from invokers. */ - pingConsumer.onMessage((topic, _, _, bytes) => { + val pingPollDuration = 1.second + val invokerPingFeed = context.system.actorOf(Props { + new MessageFeed("ping", logging, pingConsumer, pingConsumer.maxPeek, pingPollDuration, processInvokerPing, logHandoff = false) + }) + + def processInvokerPing(bytes: Array[Byte]): Future[Unit] = Future { val raw = new String(bytes, StandardCharsets.UTF_8) PingMessage.parse(raw) match { - case Success(p: PingMessage) => self ! p - case Failure(t) => logging.error(this, s"failed processing message: $raw with $t") + case Success(p: PingMessage) => + self ! p + invokerPingFeed ! MessageFeed.Processed + + case Failure(t) => + invokerPingFeed ! MessageFeed.Processed + logging.error(this, s"failed processing message: $raw with $t") } - }) + } } object InvokerPool { diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala index 6534307..63655c9 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala @@ -33,8 +33,10 @@ import org.apache.kafka.clients.producer.RecordMetadata import akka.actor.ActorRefFactory import akka.actor.ActorSystem +import akka.actor.Props import akka.pattern.ask import akka.util.Timeout + import whisk.common.ConsulClient import whisk.common.Logging import whisk.common.LoggingMarkers @@ -44,6 +46,7 @@ import whisk.connector.kafka.KafkaProducerConnector import whisk.core.WhiskConfig import whisk.core.WhiskConfig._ import whisk.core.connector.{ ActivationMessage, CompletionMessage } +import whisk.core.connector.MessageFeed import whisk.core.connector.MessageProducer import whisk.core.database.NoDocumentException import whisk.core.entity.{ ActivationId, WhiskAction, WhiskActivation } @@ -81,7 +84,13 @@ trait LoadBalancer { } -class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore: EntityStore)(implicit val actorSystem: ActorSystem, logging: Logging) extends LoadBalancer { +class LoadBalancerService( + config: WhiskConfig, + instance: InstanceId, + entityStore: EntityStore)( + implicit val actorSystem: ActorSystem, + logging: Logging) + extends LoadBalancer { /** The execution context for futures */ implicit val executionContext: ExecutionContext = actorSystem.dispatcher @@ -224,9 +233,10 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore throw new IllegalStateException("cannot create test action for invoker health because runtime manifest is not valid") } + val maxPingsPerPoll = 128 val consul = new ConsulClient(config.consulServer) // Each controller gets its own Group Id, to receive all messages - val pingConsumer = new KafkaConsumerConnector(config.kafkaHost, s"health${instance.toInt}", "health") + val pingConsumer = new KafkaConsumerConnector(config.kafkaHost, s"health${instance.toInt}", "health", maxPeek = maxPingsPerPoll) val invokerFactory = (f: ActorRefFactory, name: String) => f.actorOf(InvokerActor.props(instance), name) actorSystem.actorOf(InvokerPool.props(invokerFactory, consul.kv, invoker => { @@ -235,22 +245,33 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore }, (m, i) => sendActivationToInvoker(messageProducer, m, i), pingConsumer)) } - /** Subscribes to active acks (completion messages from the invokers). */ - private val activeAckConsumer = new KafkaConsumerConnector(config.kafkaHost, "completions", s"completed${instance.toInt}") + /** + * Subscribes to active acks (completion messages from the invokers), and + * registers a handler for received active acks from invokers. + */ + val maxActiveAcksPerPoll = 128 + val activeAckPollDuration = 1.second - /** Registers a handler for received active acks from invokers. */ - activeAckConsumer.onMessage((topic, _, _, bytes) => { + private val activeAckConsumer = new KafkaConsumerConnector(config.kafkaHost, "completions", s"completed${instance.toInt}", maxPeek = maxActiveAcksPerPoll) + val activationFeed = actorSystem.actorOf(Props { + new MessageFeed("activeack", logging, activeAckConsumer, maxActiveAcksPerPoll, activeAckPollDuration, processActiveAck) + }) + + def processActiveAck(bytes: Array[Byte]): Future[Unit] = Future { val raw = new String(bytes, StandardCharsets.UTF_8) CompletionMessage.parse(raw) match { case Success(m: CompletionMessage) => processCompletion(m.response, m.transid, false) // treat left as success (as it is the result a the message exceeding the bus limit) val isSuccess = m.response.fold(l => true, r => !r.response.isWhiskError) + activationFeed ! MessageFeed.Processed invokerPool ! InvocationFinishedMessage(m.invoker, isSuccess) - case Failure(t) => logging.error(this, s"failed processing message: $raw with $t") + case Failure(t) => + activationFeed ! MessageFeed.Processed + logging.error(this, s"failed processing message: $raw with $t") } - }) + } /** Return a sorted list of available invokers. */ private def availableInvokers: Future[Seq[String]] = invokerHealth.map { diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala index 9a796ee..f1f51f5 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala @@ -24,12 +24,13 @@ import akka.actor.ActorRef import akka.actor.ActorRefFactory import akka.actor.Props import whisk.common.AkkaLogging -import whisk.core.dispatcher.ActivationFeed.ContainerReleased + import whisk.core.entity.ByteSize import whisk.core.entity.CodeExec import whisk.core.entity.EntityName import whisk.core.entity.ExecutableWhiskAction import whisk.core.entity.size._ +import whisk.core.connector.MessageFeed sealed trait WorkerState case object Busy extends WorkerState @@ -124,7 +125,7 @@ class ContainerPool( // Activation completed case ActivationCompleted => - feed ! ContainerReleased + feed ! MessageFeed.Processed } /** Creates a new container and updates state accordingly. */ diff --git a/core/invoker/src/main/scala/whisk/core/dispatcher/ActivationFeed.scala b/core/invoker/src/main/scala/whisk/core/dispatcher/ActivationFeed.scala deleted file mode 100644 index 5ea54e7..0000000 --- a/core/invoker/src/main/scala/whisk/core/dispatcher/ActivationFeed.scala +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package whisk.core.dispatcher - -import scala.concurrent.duration.FiniteDuration -import scala.util.Try - -import org.apache.kafka.clients.consumer.CommitFailedException - -import akka.actor.Actor -import akka.actor.actorRef2Scala -import whisk.common.Logging -import whisk.common.TransactionId -import whisk.core.connector.MessageConsumer - -object ActivationFeed { - sealed class ActivationNotification - - /** Pulls new messages from the message bus. */ - case class FillQueueWithMessages() - - /** Indicates resources are available because transaction completed, may cause pipeline fill. */ - case object ContainerReleased extends ActivationNotification - - /** Indicate resources are available because transaction failed, may cause pipeline fill. */ - case class FailedActivation(tid: TransactionId) extends ActivationNotification -} - -/** - * This actor polls the message bus for new messages and dispatches them to the given - * handler. The actor tracks the number of messages dispatched and will not dispatch new - * messages until some number of them are acknowledged. - * - * This is used by the invoker to pull messages from the message bus and apply back pressure - * when the invoker does not have resources to complete processing messages (i.e., no containers - * are available to run new actions). - * - * When the invoker releases resources (by reclaiming containers) it will send a message - * to this actor which will then attempt to fill the pipeline with new messages. - * - * The actor tries to fill the pipeline with additional messages while the number - * of outstanding requests is below the pipeline fill threshold. - */ -@throws[IllegalArgumentException] -protected class ActivationFeed( - logging: Logging, - consumer: MessageConsumer, - maxPipelineDepth: Int, - longpollDuration: FiniteDuration, - handler: (String, Array[Byte]) => Any) - extends Actor { - import ActivationFeed.ActivationNotification - import ActivationFeed.FillQueueWithMessages - - require(consumer.maxPeek <= maxPipelineDepth, "consumer may not yield more messages per peek than permitted by max depth") - - private val pipelineFillThreshold = maxPipelineDepth - consumer.maxPeek - private var pipelineOccupancy = 0 - private implicit val tid = TransactionId.dispatcher - - override def receive = { - case FillQueueWithMessages => - if (pipelineOccupancy <= pipelineFillThreshold) { - Try { - // Grab next batch of messages and commit offsets immediately - // essentially marking the activation as having satisfied "at most once" - // semantics (this is the point at which the activation is considered started). - // If the commit fails, then messages peeked are peeked again on the next poll. - // While the commit is synchronous and will block until it completes, at steady - // state with enough buffering (i.e., maxPipelineDepth > maxPeek), the latency - // of the commit should be masked. - val records = consumer.peek(longpollDuration) - consumer.commit() - (records, records.size) - } map { - case (records, count) => - records foreach { - case (topic, partition, offset, bytes) => - pipelineOccupancy += 1 - logging.info(this, s"processing $topic[$partition][$offset ($count)][pipelineOccupancy=${pipelineOccupancy} (${pipelineFillThreshold})]") - handler(topic, bytes) - } - } recover { - case e: CommitFailedException => logging.error(this, s"failed to commit consumer offset: $e") - case e: Throwable => logging.error(this, s"exception while pulling new records: $e") - } - fill() - } else logging.debug(this, "dropping fill request until feed is drained") - - case n: ActivationNotification => - pipelineOccupancy -= 1 - logging.info(this, s"received ActivationNotification: $n / pipelineOccupancy=$pipelineOccupancy / pipelineFillThreshold=$pipelineFillThreshold") - if (pipelineOccupancy < 0) { - logging.error(this, "pipelineOccupancy<0") - } - fill() - } - - private def fill() = { - if (pipelineOccupancy <= pipelineFillThreshold) { - logging.debug(this, s"filling activation pipeline: $pipelineOccupancy <= $pipelineFillThreshold") - self ! FillQueueWithMessages - } else { - logging.info(this, s"waiting for activation pipeline to drain: $pipelineOccupancy > $pipelineFillThreshold") - } - } -} diff --git a/core/invoker/src/main/scala/whisk/core/dispatcher/Dispatcher.scala b/core/invoker/src/main/scala/whisk/core/dispatcher/Dispatcher.scala index 565acbc..fa0e74d 100644 --- a/core/invoker/src/main/scala/whisk/core/dispatcher/Dispatcher.scala +++ b/core/invoker/src/main/scala/whisk/core/dispatcher/Dispatcher.scala @@ -17,6 +17,8 @@ package whisk.core.dispatcher +import java.nio.charset.StandardCharsets + import scala.collection.concurrent.TrieMap import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration @@ -28,9 +30,9 @@ import akka.actor.Props import akka.actor.actorRef2Scala import whisk.common.Counter import whisk.common.Logging -import whisk.common.TransactionId import whisk.core.connector.ActivationMessage import whisk.core.connector.MessageConsumer +import whisk.core.connector.MessageFeed /** * Creates a dispatcher that pulls messages from the message pub/sub connector. @@ -54,9 +56,10 @@ class Dispatcher( implicit logging: Logging) extends Registrar { - val activationFeed = actorSystem.actorOf(Props(new ActivationFeed(logging, consumer, maxPipelineDepth, pollDuration, process))) + // create activation request feed but do not start it, until the invoker is registered + val activationFeed = actorSystem.actorOf(Props(new MessageFeed("activation", logging, consumer, maxPipelineDepth, pollDuration, process, autoStart = false))) - def start() = activationFeed ! ActivationFeed.FillQueueWithMessages + def start() = activationFeed ! MessageFeed.Ready def stop() = consumer.close() /** @@ -67,8 +70,8 @@ class Dispatcher( * A handler is registered via addHandler and unregistered via removeHandler. * There is typically only one handler. */ - def process(topic: String, bytes: Array[Byte]) = { - val raw = new String(bytes, "utf-8") + def process(bytes: Array[Byte]): Future[Unit] = Future { + val raw = new String(bytes, StandardCharsets.UTF_8) ActivationMessage.parse(raw) match { case Success(m) => handlers foreach { @@ -78,9 +81,8 @@ class Dispatcher( } } - private def handleMessage(handler: MessageHandler, msg: ActivationMessage) = { + private def handleMessage(handler: MessageHandler, msg: ActivationMessage): Unit = { implicit val tid = msg.transid - implicit val executionContext = actorSystem.dispatcher Future { val count = counter.next() @@ -92,17 +94,13 @@ class Dispatcher( } } - private def inform(matchers: TrieMap[String, MessageHandler])(implicit transid: TransactionId) = { - val names = matchers map { _._2.name } reduce (_ + "," + _) - logging.debug(this, s"matching message to ${matchers.size} handlers: $names") - matchers - } - - private def errorMsg(handler: MessageHandler, e: Throwable): String = + private def errorMsg(handler: MessageHandler, e: Throwable): String = { s"failed applying handler '${handler.name}': ${errorMsg(e)}" + } - private def errorMsg(msg: String, e: Throwable) = + private def errorMsg(msg: String, e: Throwable): String = { s"failed processing message: $msg $e${e.getStackTrace.mkString("", " ", "")}" + } private def errorMsg(e: Throwable): String = { if (e.isInstanceOf[java.util.concurrent.ExecutionException]) { @@ -113,6 +111,7 @@ class Dispatcher( } private val counter = new Counter() + private implicit val executionContext = actorSystem.dispatcher } trait Registrar { diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala index 58592e7..239370f 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala @@ -27,6 +27,8 @@ import scala.language.postfixOps import scala.util.{ Failure, Success } import scala.util.Try +import org.apache.kafka.common.errors.RecordTooLargeException + import akka.actor.{ ActorRef, ActorSystem, actorRef2Scala } import akka.japi.Creator import spray.json._ @@ -38,24 +40,18 @@ import whisk.connector.kafka.{ KafkaConsumerConnector, KafkaProducerConnector } import whisk.core.WhiskConfig import whisk.core.WhiskConfig.{ consulServer, dockerImagePrefix, dockerRegistry, kafkaHost, logsDir, servicePort, whiskVersion, invokerUseReactivePool } import whisk.core.connector.{ ActivationMessage, CompletionMessage } +import whisk.core.connector.MessageFeed import whisk.core.connector.MessageProducer import whisk.core.connector.PingMessage import whisk.core.container._ import whisk.core.dispatcher.{ Dispatcher, MessageHandler } -import whisk.core.dispatcher.ActivationFeed.{ ActivationNotification, ContainerReleased, FailedActivation } import whisk.core.entity._ import whisk.http.BasicHttpService import whisk.http.Messages import whisk.utils.ExecutionContextFactory -import whisk.common.Scheduler -import whisk.core.connector.PingMessage -import scala.util.Try -import whisk.core.connector.MessageProducer -import org.apache.kafka.common.errors.RecordTooLargeException -import whisk.core.entity.TimeLimit /** - * A kafka message handler that invokes actions as directed by message on topic "/actions/invoke". + * A message handler that invokes actions as directed by message on topic "/actions/invoke". * The message path must contain a fully qualified action name and an optional revision id. * * @param config the whisk configuration @@ -119,7 +115,7 @@ class Invoker( case Success(activation) => transactionPromise.completeWith { // this completes the successful activation case (1) - completeTransaction(tran, activation, ContainerReleased) + completeTransaction(tran, activation) } case Failure(t) => @@ -164,7 +160,7 @@ class Invoker( // send activate ack for failed activations sendActiveAck(tran, activationResult) - completeTransaction(tran, activationResult, FailedActivation(transid)) + completeTransaction(tran, activationResult) } /* @@ -173,7 +169,7 @@ class Invoker( * Invariant: Only one call to here succeeds. Even though the sync block wrap WhiskActivation.put, * it is only blocking this transaction which is finishing anyway. */ - protected def completeTransaction(tran: Transaction, activation: WhiskActivation, releaseResource: ActivationNotification)( + protected def completeTransaction(tran: Transaction, activation: WhiskActivation)( implicit transid: TransactionId): Future[DocInfo] = { tran.synchronized { tran.result match { @@ -183,7 +179,7 @@ class Invoker( // Send a message to the activation feed indicating there is a free resource to handle another activation. // Since all transaction completions flow through this method and the invariant is that the transaction is // completed only once, there is only one completion message sent to the feed as a result. - activationFeed ! releaseResource + activationFeed ! MessageFeed.Processed // Since there is no active action taken for completion from the invoker, writing activation record is it. logging.info(this, "recording the activation result to the data store") val result = WhiskActivation.put(activationStore, activation) andThen { diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala index d620e3d..456182b 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala @@ -36,6 +36,7 @@ import whisk.common.TransactionId import whisk.core.WhiskConfig import whisk.core.connector.ActivationMessage import whisk.core.connector.CompletionMessage +import whisk.core.connector.MessageFeed import whisk.core.connector.MessageProducer import whisk.core.container.{ ContainerPool => OldContainerPool } import whisk.core.container.Interval @@ -47,13 +48,13 @@ import whisk.core.containerpool.docker.DockerClientWithFileAccess import whisk.core.containerpool.docker.DockerContainer import whisk.core.containerpool.docker.RuncClient import whisk.core.database.NoDocumentException -import whisk.core.dispatcher.ActivationFeed.FailedActivation import whisk.core.dispatcher.MessageHandler import whisk.core.entity._ import whisk.core.entity.ExecManifest.ImageName import whisk.core.entity.size._ import whisk.http.Messages + class InvokerReactive( config: WhiskConfig, instance: InstanceId, @@ -206,7 +207,7 @@ class InvokerReactive( Parameters("path", msg.action.toString.toJson) ++ causedBy }) - activationFeed ! FailedActivation(msg.transid) + activationFeed ! MessageFeed.Processed ack(msg.transid, activation, msg.rootControllerIndex) store(msg.transid, activation) } diff --git a/docs/actions.md b/docs/actions.md index 7dcae8d..b479bc9 100644 --- a/docs/actions.md +++ b/docs/actions.md @@ -640,7 +640,7 @@ To avoid the cold-start delay, you can compile your Swift file into a binary and ``` docker run --rm -it -v "$(pwd):/owexec" openwhisk/action-swift-v3.1.1 bash ``` - This puts you in a bash shell within the Docker container. + This puts you in a bash shell within the Docker container. - Copy the source code and prepare to build it. ``` @@ -658,7 +658,7 @@ To avoid the cold-start delay, you can compile your Swift file into a binary and - (Optional) Create the `Package.swift` file to add dependencies. ```swift import PackageDescription - + let package = Package( name: "Action", dependencies: [ @@ -699,7 +699,7 @@ and so you should include them in your own `Package.swift`. exit ``` - This has created hello.zip in the same directory as hello.swift. + This has created hello.zip in the same directory as hello.swift. - Upload it to OpenWhisk with the action name helloSwifty: ``` @@ -731,6 +731,7 @@ For example, create a Java file called `Hello.java` with the following content: ```java import com.google.gson.JsonObject; + public class Hello { public static JsonObject main(JsonObject args) { String name = "stranger"; diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala b/tests/src/test/scala/services/KafkaConnectorTests.scala index 60bb3d3..f73e554 100644 --- a/tests/src/test/scala/services/KafkaConnectorTests.scala +++ b/tests/src/test/scala/services/KafkaConnectorTests.scala @@ -21,10 +21,8 @@ import java.util.Calendar import scala.concurrent.Await import scala.concurrent.duration.DurationInt -import scala.concurrent.duration.FiniteDuration import scala.language.postfixOps -import org.apache.commons.lang3.StringUtils import org.apache.kafka.clients.consumer.CommitFailedException import org.junit.runner.RunWith import org.scalatest.BeforeAndAfterAll @@ -34,14 +32,12 @@ import org.scalatest.junit.JUnitRunner import common.StreamLogging import common.WskActorSystem -import whisk.common.Logging import whisk.common.TransactionId import whisk.connector.kafka.KafkaConsumerConnector import whisk.connector.kafka.KafkaProducerConnector import whisk.core.WhiskConfig import whisk.core.connector.Message import whisk.utils.ExecutionContextFactory -import whisk.utils.retry @RunWith(classOf[JUnitRunner]) class KafkaConnectorTests @@ -61,7 +57,7 @@ class KafkaConnectorTests val sessionTimeout = 10 seconds val maxPollInterval = 10 seconds val producer = new KafkaProducerConnector(config.kafkaHost, ec) - val consumer = new TestKafkaConsumerConnector(config.kafkaHost, groupid, topic, sessionTimeout = sessionTimeout, maxPollInterval = maxPollInterval) + val consumer = new KafkaConsumerConnector(config.kafkaHost, groupid, topic, sessionTimeout = sessionTimeout, maxPollInterval = maxPollInterval) override def afterAll() { producer.close() @@ -116,52 +112,4 @@ class KafkaConnectorTests } else consumer.commit() } } - - it should "catch a failing commit" in { - val messageReceived = "message received" - consumer.onMessage((topic, partition, offset, bytes) => { - printstream.println(messageReceived) - }) - val message = new Message { override val serialize = Calendar.getInstance().getTime().toString } - - // Send message while commit throws no exception -> Should be processed - consumer.commitFails = false - Await.result(producer.send(topic, message), 10 seconds) - retry(stream.toString should include(messageReceived), 20, Some(500 millisecond)) - - // Send message while commit throws exception -> Message will not be processed - consumer.commitFails = true - retry(stream.toString should include("failed to commit to kafka:"), 50, Some(100 millisecond)) - Await.result(producer.send(topic, message), 10 seconds) - retry(stream.toString should include("failed to commit to kafka:"), 50, Some(100 millisecond)) - - // Send message again -> No commit exception -> Should work again - consumer.commitFails = false - Await.result(producer.send(topic, message), 10 seconds) - retry(StringUtils.countMatches(stream.toString, messageReceived) should be(2), 50, Some(100 milliseconds)) - - // Wait a few seconds and ensure that the message is not processed three times - Thread.sleep(5000) - StringUtils.countMatches(stream.toString, messageReceived) should be(2) - } - -} - -class TestKafkaConsumerConnector( - kafkahost: String, - groupid: String, - topic: String, - sessionTimeout: FiniteDuration, - maxPollInterval: FiniteDuration)(implicit logging: Logging) extends KafkaConsumerConnector( - kafkahost, groupid, topic, sessionTimeout = sessionTimeout, maxPollInterval = maxPollInterval) { - - override def commit() = { - if (commitFails) { - throw new CommitFailedException() - } else { - super.commit() - } - } - - var commitFails = false } diff --git a/tests/src/test/scala/whisk/core/connector/test/MessageFeedTests.scala b/tests/src/test/scala/whisk/core/connector/test/MessageFeedTests.scala new file mode 100644 index 0000000..030a6d0 --- /dev/null +++ b/tests/src/test/scala/whisk/core/connector/test/MessageFeedTests.scala @@ -0,0 +1,191 @@ +/* + * Copyright 2015-2016 IBM Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.connector.test + +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.Buffer +import scala.concurrent.Future +import scala.concurrent.duration._ + +import org.junit.runner.RunWith +import org.scalamock.scalatest.MockFactory +import org.scalatest.BeforeAndAfterAll +import org.scalatest.BeforeAndAfterEach +import org.scalatest.FlatSpecLike +import org.scalatest.Matchers +import org.scalatest.junit.JUnitRunner + +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.FSM +import akka.actor.FSM.CurrentState +import akka.actor.FSM.SubscribeTransitionCallBack +import akka.actor.FSM.Transition +import akka.actor.PoisonPill +import akka.actor.Props +import akka.testkit.TestKit +import common.StreamLogging +import whisk.core.connector._ +import whisk.core.connector.MessageFeed._ +import whisk.utils.retry + +@RunWith(classOf[JUnitRunner]) +class MessageFeedTests + extends FlatSpecLike + with Matchers + with BeforeAndAfterEach + with BeforeAndAfterAll + with MockFactory + with StreamLogging { + + val system = ActorSystem("MessageFeedTestSystem") + val actorsToDestroyAfterEach: Buffer[ActorRef] = Buffer.empty + + override def afterEach() = actorsToDestroyAfterEach.foreach { _ ! PoisonPill } + override def afterAll() = TestKit.shutdownActorSystem(system) + + case class Connector(autoStart: Boolean = true) extends TestKit(system) { + val peekCount = new AtomicInteger() + + val consumer = new TestConnector("feedtest", 4, true) { + override def peek(duration: Duration) = { + peekCount.incrementAndGet() + super.peek(duration) + } + } + + val sentCount = new AtomicInteger() + + def fill(n: Int) = { + val msgs = (1 to n).map { _ => + new Message { + override def serialize = { + sentCount.incrementAndGet().toString + } + override def toString = { + s"message${sentCount.get}" + } + } + } + consumer.send(msgs) + } + + val receivedCount = new AtomicInteger() + + def handler(bytes: Array[Byte]): Future[Unit] = { + Future.successful(receivedCount.incrementAndGet()) + } + + val fsm = childActorOf(Props(new MessageFeed( + "test", + logging, + consumer, + consumer.maxPeek, + 200.milliseconds, + handler, + autoStart))) + + actorsToDestroyAfterEach += (fsm, testActor) + + def monitorTransitionsAndStart() = { + fsm ! SubscribeTransitionCallBack(testActor) + expectMsg(CurrentState(fsm, Idle)) + fsm ! Ready + expectMsg(Transition(fsm, Idle, FillingPipeline)) + this + } + } + + def timeout(actor: ActorRef) = actor ! FSM.StateTimeout + + it should "wait for ready before accepting messages" in { + val connector = Connector(autoStart = false) + connector.fsm ! SubscribeTransitionCallBack(connector.testActor) + + // start idle + connector.expectMsg(CurrentState(connector.fsm, Idle)) + + // stay until received ready + connector.fsm ! FSM.StateTimeout // should be ignored + connector.fsm ! Processed // should be ignored + Thread.sleep(500.milliseconds.toMillis) + connector.peekCount.get shouldBe 0 + + // start filling + connector.fsm ! Ready + connector.expectMsg(Transition(connector.fsm, Idle, FillingPipeline)) + retry(connector.peekCount.get should be > 0) + } + + it should "auto start and start polling for messages" in { + val connector = Connector(autoStart = true) + // automatically start filling + retry(connector.peekCount.get should be > 0, 5, Some(200.milliseconds)) + } + + it should "stop polling for messages when the pipeline is full" in { + val connector = Connector(autoStart = false).monitorTransitionsAndStart() + // push enough to cause pipeline to exceed fill mark + connector.fill(connector.consumer.maxPeek * 2 + 1) + retry(connector.peekCount.get should be > 0) + retry(connector.receivedCount.get shouldBe connector.consumer.maxPeek, 10, Some(200.milliseconds)) + + val peeks = connector.peekCount.get + connector.expectMsg(Transition(connector.fsm, FillingPipeline, DrainingPipeline)) + + connector.peekCount.get shouldBe peeks + connector.expectNoMsg(500.milliseconds) + } + + it should "transition from drain to fill mode" in { + val connector = Connector(autoStart = false).monitorTransitionsAndStart() + println(connector.fsm.toString()) + // push enough to cause pipeline to exceed fill mark + val sendCount = connector.consumer.maxPeek * 2 + 2 + connector.fill(sendCount) + retry(connector.peekCount.get should be > 0) + retry(connector.receivedCount.get shouldBe connector.consumer.maxPeek, 10, Some(200.milliseconds)) + + val peeks = connector.peekCount.get + connector.expectMsg(Transition(connector.fsm, FillingPipeline, DrainingPipeline)) + + // stay in drain mode, no more peeking + timeout(connector.fsm) // should be ignored + connector.expectNoMsg(500.milliseconds) + connector.peekCount.get shouldBe peeks // no new reads + + // expecting overflow of 2 in the queue, which is true if all expected messages were sent + retry(connector.sentCount.get shouldBe sendCount, 5, Some(200.milliseconds)) + + // drain one, should stay in draining state + connector.fsm ! Processed + connector.expectNoMsg(500.milliseconds) + connector.peekCount.get shouldBe peeks // no new reads + + // back to fill mode + connector.fsm ! Processed + connector.expectMsg(Transition(connector.fsm, DrainingPipeline, FillingPipeline)) + retry(connector.peekCount.get should be >= (peeks + 1)) + + // should send back to drain mode + connector.fill(1) + connector.expectMsg(Transition(connector.fsm, FillingPipeline, DrainingPipeline)) + + connector.expectNoMsg(500.milliseconds) + } +} diff --git a/tests/src/test/scala/whisk/core/dispatcher/test/TestConnector.scala b/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala similarity index 78% rename from tests/src/test/scala/whisk/core/dispatcher/test/TestConnector.scala rename to tests/src/test/scala/whisk/core/connector/test/TestConnector.scala index 9e7fe34..b454c7c 100644 --- a/tests/src/test/scala/whisk/core/dispatcher/test/TestConnector.scala +++ b/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala @@ -15,20 +15,20 @@ * limitations under the License. */ -package whisk.core.dispatcher.test +package whisk.core.connector.test import java.util.ArrayList import java.util.concurrent.LinkedBlockingQueue -import scala.collection.JavaConversions.asScalaBuffer import scala.concurrent.Future import scala.concurrent.duration.Duration +import scala.collection.JavaConversions._ import org.apache.kafka.clients.producer.RecordMetadata import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.Record - import common.StreamLogging + import whisk.common.Counter import whisk.core.connector.Message import whisk.core.connector.MessageConsumer @@ -44,7 +44,6 @@ class TestConnector( override def peek(duration: Duration) = { val msgs = new ArrayList[Message] queue.drainTo(msgs, if (allowMoreThanMax) Int.MaxValue else maxPeek) - msgs map { m => offset += 1 (topic, -1, offset, m.serialize.getBytes) @@ -59,23 +58,17 @@ class TestConnector( } } - override def onMessage(process: (String, Int, Long, Array[Byte]) => Unit): Unit = { - new Thread { - override def run() = while (!closed) { - val msg = queue.take() - logging.info(this, s"received message for '$topic'") - process(topic, -1, -1, msg.serialize.getBytes) - Thread.sleep(100) // let producer get in a send if any - } - }.start - } - def occupancy = queue.size def send(msg: Message): Future[RecordMetadata] = { producer.send(topic, msg) } + def send(msgs: Seq[Message]): Future[RecordMetadata] = { + import scala.language.reflectiveCalls + producer.sendBulk(topic, msgs) + } + def close() = { closed = true producer.close() @@ -91,12 +84,23 @@ class TestConnector( Future.failed(new IllegalStateException("failed to write msg")) } } + + def sendBulk(topic: String, msgs: Seq[Message]): Future[RecordMetadata] = { + if (queue.addAll(msgs)) { + logging.info(this, s"put: ${msgs.length} messages") + Future.successful(new RecordMetadata(new TopicPartition(topic, 0), 0, queue.size, Record.NO_TIMESTAMP, -1, -1, -1)) + } else { + logging.error(this, s"put failed: ${msgs.length} messages") + Future.failed(new IllegalStateException("failed to write msg")) + } + } + def close() = {} def sentCount() = counter.next() val counter = new Counter() } - protected[test] var throwCommitException = false + var throwCommitException = false private val queue = new LinkedBlockingQueue[Message]() @volatile private var closed = false private var offset = -1L diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala index f7bcdf1..866d370 100644 --- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala @@ -38,11 +38,11 @@ import akka.testkit.TestProbe import whisk.common.TransactionId import whisk.core.connector.ActivationMessage import whisk.core.containerpool._ -import whisk.core.dispatcher.ActivationFeed.ContainerReleased import whisk.core.entity._ import whisk.core.entity.ExecManifest.RuntimeManifest import whisk.core.entity.ExecManifest.ImageName import whisk.core.entity.size._ +import whisk.core.connector.MessageFeed /** * Behavior tests for the ContainerPool @@ -114,7 +114,7 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool")) val pool = system.actorOf(ContainerPool.props(factory, 0, 0, feed.ref)) containers(0).send(pool, ActivationCompleted) - feed.expectMsg(ContainerReleased) + feed.expectMsg(MessageFeed.Processed) } /* @@ -159,7 +159,7 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool")) containers(0).expectMsg(runMessage) containers(0).send(pool, NeedWork(warmedData())) containers(0).send(pool, ActivationCompleted) - feed.expectMsg(ContainerReleased) + feed.expectMsg(MessageFeed.Processed) pool ! runMessageDifferentEverything containers(0).expectMsg(Remove) containers(1).expectMsg(runMessageDifferentEverything) @@ -177,14 +177,14 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool")) containers(0).expectMsg(runMessage) containers(0).send(pool, NeedWork(warmedData(lastUsed = Instant.EPOCH))) containers(0).send(pool, ActivationCompleted) - feed.expectMsg(ContainerReleased) + feed.expectMsg(MessageFeed.Processed) // Run the second container, don't remove the first one pool ! runMessageDifferentEverything containers(1).expectMsg(runMessageDifferentEverything) containers(1).send(pool, NeedWork(warmedData(lastUsed = Instant.now))) containers(1).send(pool, ActivationCompleted) - feed.expectMsg(ContainerReleased) + feed.expectMsg(MessageFeed.Processed) pool ! runMessageDifferentNamespace containers(2).expectMsg(runMessageDifferentNamespace) @@ -203,7 +203,7 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool")) containers(0).expectMsg(runMessage) containers(0).send(pool, NeedWork(warmedData())) containers(0).send(pool, ActivationCompleted) - feed.expectMsg(ContainerReleased) + feed.expectMsg(MessageFeed.Processed) pool ! runMessageDifferentNamespace containers(0).expectMsg(Remove) containers(1).expectMsg(runMessageDifferentNamespace) @@ -219,7 +219,7 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool")) containers(0).expectMsg(runMessage) containers(0).send(pool, NeedWork(warmedData())) containers(0).send(pool, ActivationCompleted) - feed.expectMsg(ContainerReleased) + feed.expectMsg(MessageFeed.Processed) pool ! runMessage containers(0).expectMsg(runMessage) pool ! runMessage //expect this message to be requeued since previous is incomplete. diff --git a/tests/src/test/scala/whisk/core/controller/test/PackagesApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/PackagesApiTests.scala index d6d74c6..6488156 100644 --- a/tests/src/test/scala/whisk/core/controller/test/PackagesApiTests.scala +++ b/tests/src/test/scala/whisk/core/controller/test/PackagesApiTests.scala @@ -519,7 +519,6 @@ class PackagesApiTests extends ControllerTestCommon with WhiskPackagesApi { deletePackage(reference.docid) status should be(OK) val response = responseAs[WhiskPackage] - println(responseAs[String]) response should be { WhiskPackage(reference.namespace, reference.name, reference.binding, version = reference.version.upPatch, diff --git a/tests/src/test/scala/whisk/core/dispatcher/test/DispatcherTests.scala b/tests/src/test/scala/whisk/core/dispatcher/test/DispatcherTests.scala index 3084270..aafd64f 100644 --- a/tests/src/test/scala/whisk/core/dispatcher/test/DispatcherTests.scala +++ b/tests/src/test/scala/whisk/core/dispatcher/test/DispatcherTests.scala @@ -35,7 +35,8 @@ import spray.json.JsNumber import spray.json.JsObject import whisk.common.TransactionId import whisk.core.connector.{ ActivationMessage => Message } -import whisk.core.dispatcher.ActivationFeed +import whisk.core.connector.MessageFeed +import whisk.core.connector.test.TestConnector import whisk.core.dispatcher.Dispatcher import whisk.core.dispatcher.MessageHandler import whisk.core.entity._ @@ -79,12 +80,12 @@ class DispatcherTests } it should "send and receive a message from connector bus" in { - val maxdepth = 8 - val half = maxdepth / 2 - val connector = new TestConnector("test connector", maxdepth / 2, true) + val capacity = 4 + val connector = new TestConnector("test connector", capacity, false) + val messagesProcessed = new AtomicInteger() val handler = new TestRule({ msg => messagesProcessed.incrementAndGet() }) - val dispatcher = new Dispatcher(connector, 100 milliseconds, maxdepth, actorSystem) + val dispatcher = new Dispatcher(connector, 100 milliseconds, capacity, actorSystem) dispatcher.addHandler(handler, true) dispatcher.start() @@ -94,72 +95,57 @@ class DispatcherTests Console.withErr(stream) { retry({ val logs = stream.toString() - logs should include regex (s"exception while pulling new records *.* commit failed") + logs should include regex (s"exception while pulling new activation records *.* commit failed") }, 10, Some(100 milliseconds)) connector.throwCommitException = false } } - for (i <- 0 to half) { + for (i <- 0 until (2 * capacity + 1)) { sendMessage(connector, i + 1) } - // wait until all messages are received at which point the - // dispatcher cannot drain anymore messages - withClue("the queue should be empty since all messages are drained") { - retry({ - connector.occupancy shouldBe 0 - }, 10, Some(100 milliseconds)) - } - + // only process as many messages as we have downstream capacity withClue("messages processed") { retry({ - messagesProcessed.get should be(half + 1) + messagesProcessed.get should be(capacity) }, 20, Some(100 milliseconds)) } withClue("confirming dispatcher is in overflow state") { val logs = stream.toString() - logs should include regex (s"waiting for activation pipeline to drain: ${half + 1} > $half") + logs should include regex (s"activation pipeline must drain: ${capacity + 1} > $capacity") } // send one message and check later that it remains in the connector - // at this point, total messages sent = half + 2 - sendMessage(connector, half + 2) - - withClue("confirming dispatcher will not consume additional messages when in overflow state") { - stream.reset() - Console.withOut(stream) { - dispatcher.activationFeed ! ActivationFeed.FillQueueWithMessages - retry({ - val logs = stream.toString() - logs should include regex (s"dropping fill request until feed is drained") - logs should not include regex(s"waiting for activation pipeline to drain: ${messagesProcessed.get} > $half") - }, 10, Some(100 milliseconds)) - } - } + // at this point, total messages sent = 2 * capacity + 2 + connector.occupancy shouldBe 0 + sendMessage(connector, 2 * capacity + 2) + Thread.sleep(1.second.toMillis) withClue("expecting message to still be in the queue") { - connector.occupancy shouldBe 1 + retry({ + connector.occupancy shouldBe 1 + }, 10, Some(100 milliseconds)) } // unblock the pipeline by draining 1 activations and check // that dispatcher refilled the pipeline stream.reset() Console.withOut(stream) { - dispatcher.activationFeed ! ActivationFeed.ContainerReleased + dispatcher.activationFeed ! MessageFeed.Processed // wait until additional message is drained retry({ withClue("additional messages processed") { - messagesProcessed.get shouldBe half + 2 + messagesProcessed.get shouldBe capacity + 1 } }, 10, Some(100 milliseconds)) } withClue("confirm dispatcher tried to fill the pipeline") { val logs = stream.toString() - logs should include regex (s"filling activation pipeline: $half <= $half") + logs should include regex (s"activation pipeline has capacity: $capacity <= $capacity") } } finally { dispatcher.stop() diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala index d3feabe..eaa0278 100644 --- a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala +++ b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala @@ -50,7 +50,6 @@ import whisk.common.KeyValueStore import whisk.common.TransactionId import whisk.core.WhiskConfig import whisk.core.connector.ActivationMessage -import whisk.core.connector.MessageConsumer import whisk.core.connector.PingMessage import whisk.core.entitlement.Privilege.Privilege import whisk.core.entity.ActivationId.ActivationIdGenerator @@ -75,6 +74,7 @@ import whisk.core.loadBalancer.InvokerState import whisk.core.loadBalancer.Offline import whisk.core.loadBalancer.UnHealthy import whisk.utils.retry +import whisk.core.connector.test.TestConnector @RunWith(classOf[JUnitRunner]) class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision")) @@ -101,6 +101,8 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision")) /** Queries all invokers for their state */ def allStates(pool: ActorRef) = Await.result(pool.ask(GetStatus).mapTo[Map[String, InvokerState]], timeout.duration) + val pC = new TestConnector("pingFeedTtest", 4, false) {} + behavior of "InvokerPool" it should "successfully create invokers in its pool on ping and keep track of statechanges" in { @@ -114,7 +116,7 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision")) val kv = stub[KeyValueStore] val sendActivationToInvoker = stubFunction[ActivationMessage, String, Future[RecordMetadata]] - val pC = stub[MessageConsumer] + val supervisor = system.actorOf(InvokerPool.props(childFactory, kv, () => _, sendActivationToInvoker, pC)) within(timeout.duration) { @@ -156,7 +158,6 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision")) val kv = stub[KeyValueStore] val callback = stubFunction[String, Unit] val sendActivationToInvoker = stubFunction[ActivationMessage, String, Future[RecordMetadata]] - val pC = stub[MessageConsumer] val supervisor = system.actorOf(InvokerPool.props(childFactory, kv, callback, sendActivationToInvoker, pC)) within(timeout.duration) { @@ -186,7 +187,6 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision")) val childFactory = (f: ActorRefFactory, name: String) => invoker.ref val kv = stub[KeyValueStore] val sendActivationToInvoker = stubFunction[ActivationMessage, String, Future[RecordMetadata]] - val pC = stub[MessageConsumer] val supervisor = system.actorOf(InvokerPool.props(childFactory, kv, () => _, sendActivationToInvoker, pC)) @@ -213,7 +213,6 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision")) val kv = stub[KeyValueStore] val sendActivationToInvoker = stubFunction[ActivationMessage, String, Future[RecordMetadata]] - val pC = stub[MessageConsumer] val supervisor = system.actorOf(InvokerPool.props(childFactory, kv, () => _, sendActivationToInvoker, pC)) -- To stop receiving notification emails like this one, please contact ['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].