This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new ed39c03 Fix activation feed book-keeping in reactive pool (#2319) ed39c03 is described below commit ed39c03a0b9c1f5ab96da68cde1448e2ccfeb8d5 Author: Sven Lange-Last <sven.lange-l...@de.ibm.com> AuthorDate: Tue Jun 6 07:23:44 2017 +0200 Fix activation feed book-keeping in reactive pool (#2319) The old mechanism piggybacked on the `NeedWork` message to also claim free resources to the ActivationFeed. That is incomplete in that this won’t signal free resources for various failure scenarios. This decouples the `NeedWork` signal from the `ActivationComplete` signal to be able to pull in new ActivationMessages from the ActivationFeed reliably. Fixes #2285. --- .../whisk/core/containerpool/ContainerPool.scala | 17 ++-- .../whisk/core/containerpool/ContainerProxy.scala | 91 +++++++++++++++------- .../whisk/core/dispatcher/ActivationFeed.scala | 8 +- .../scala/whisk/core/invoker/InvokerReactive.scala | 4 +- .../containerpool/test/ContainerPoolTests.scala | 7 +- .../containerpool/test/ContainerProxyTests.scala | 6 +- 6 files changed, 91 insertions(+), 42 deletions(-) diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala index 2ef3bae..1af51bf 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala @@ -51,7 +51,7 @@ case class WorkerData(data: ContainerData, state: WorkerState) * Prewarm containers are only used, if they have matching arguments * (kind, memory) and there is space in the pool. * - * @param childFactory method to create new containers + * @param childFactory method to create new container proxy actors * @param maxPoolSize maximum size of containers allowed in the pool * @param feed actor to request more work from * @param prewarmConfig optional settings for container prewarming @@ -95,7 +95,7 @@ class ContainerPool( pool.get(actor) match { case Some(w) => pool.update(actor, WorkerData(w.data, Busy)) - actor ! r + actor ! r // forwards the run request to the container case None => logging.error(this, "actor data not found") self ! r @@ -106,17 +106,16 @@ class ContainerPool( } // Container is free to take more work - case NeedWork(data: WarmedData) => - pool.update(sender(), WorkerData(data, Free)) - feed ! ContainerReleased + case NeedWork(data: WarmedData) => pool.update(sender(), WorkerData(data, Free)) // Container is prewarmed and ready to take work - case NeedWork(data: PreWarmedData) => - prewarmedPool.update(sender(), WorkerData(data, Free)) + case NeedWork(data: PreWarmedData) => prewarmedPool.update(sender(), WorkerData(data, Free)) // Container got removed - case ContainerRemoved => - pool.remove(sender()) + case ContainerRemoved => pool.remove(sender()) + + // Activation completed + case ActivationCompleted => feed ! ContainerReleased } /** Creates a new container and updates state accordingly. */ diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala index 025a448..4aa77a7 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala @@ -37,6 +37,8 @@ import whisk.core.entity._ import whisk.core.entity.size._ import whisk.common.Counter import whisk.core.entity.ExecManifest.ImageName +import whisk.common.AkkaLogging +import whisk.http.Messages // States sealed trait ContainerState @@ -64,6 +66,12 @@ case object Remove case class NeedWork(data: ContainerData) case object ContainerPaused case object ContainerRemoved +/** + * Indicates the container resource is now free to receive a new request. + * This message is sent to the parent which in turn notifies the feed that a + * resource slot is available. + */ +case object ActivationCompleted /** * A proxy that wraps a Container. It is used to keep track of the lifecycle @@ -88,6 +96,7 @@ class ContainerProxy( sendActiveAck: (TransactionId, WhiskActivation) => Future[Any], storeActivation: (TransactionId, WhiskActivation) => Future[Any]) extends FSM[ContainerState, ContainerData] with Stash { implicit val ec = context.system.dispatcher + val logging = new AkkaLogging(context.system.log) // The container is destroyed after this period of time val unusedTimeout = 10.minutes @@ -99,7 +108,7 @@ class ContainerProxy( startWith(Uninitialized, NoData()) when(Uninitialized) { - // pre warm a container + // pre warm a container (creates a stem cell container) case Event(job: Start, _) => factory( TransactionId.invokerWarmup, @@ -112,32 +121,49 @@ class ContainerProxy( goto(Starting) - // cold start + // cold start (no container to reuse or available stem cell container) case Event(job: Run, _) => implicit val transid = job.msg.transid - factory( + + // create a new container + val container = factory( job.msg.transid, ContainerProxy.containerName(job.msg.user.namespace.name, job.action.name.name), job.action.exec.image, job.action.exec.pull, job.action.limits.memory.megabytes.MB) - .andThen { - case Success(container) => self ! PreWarmedData(container, job.action.exec.kind, job.action.limits.memory.megabytes.MB) - case Failure(t) => - val response = t match { - case WhiskContainerStartupError(msg) => ActivationResponse.whiskError(msg) - case BlackboxStartupError(msg) => ActivationResponse.applicationError(msg) - case _ => ActivationResponse.whiskError(t.getMessage) - } - val activation = ContainerProxy.constructWhiskActivation(job, Interval.zero, response) - sendActiveAck(transid, activation) - storeActivation(transid, activation) - } - .flatMap { - container => - initializeAndRun(container, job) - .map(_ => WarmedData(container, job.msg.user.namespace, job.action, Instant.now)) - }.pipeTo(self) + + // container factory will either yield a new container ready to execute the action, or + // starting up the container failed; for the latter, it's either an internal error starting + // a container or a docker action that is not conforming to the required action API + container.andThen { + case Success(container) => + // the container is ready to accept an activation; register it as PreWarmed; this + // normalizes the life cycle for containers and their cleanup when activations fail + self ! PreWarmedData(container, job.action.exec.kind, job.action.limits.memory.megabytes.MB) + + case Failure(t) => + // the container did not come up cleanly, so disambiguate the failure mode and then cleanup + // the failure is either the system fault, or for docker actions, the application/developer fault + val response = t match { + case WhiskContainerStartupError(msg) => ActivationResponse.whiskError(msg) + case BlackboxStartupError(msg) => ActivationResponse.applicationError(msg) + case _ => ActivationResponse.whiskError(t.getMessage) + } + // construct an appropriate activation and record it in the datastore, + // also update the feed and active ack; the container cleanup is queued + // implicitly via a FailureMessage which will be processed later when the state + // transitions to Running + val activation = ContainerProxy.constructWhiskActivation(job, Interval.zero, response) + self ! ActivationCompleted + sendActiveAck(transid, activation) + storeActivation(transid, activation) + }.flatMap { + container => + // now attempt to inject the user code and run the action + initializeAndRun(container, job) + .map(_ => WarmedData(container, job.msg.user.namespace, job.action, Instant.now)) + }.pipeTo(self) goto(Running) } @@ -189,6 +215,11 @@ class ContainerProxy( context.parent ! ContainerRemoved stop() + // Activation finished either successfully or not + case Event(ActivationCompleted, _) => + context.parent ! ActivationCompleted + stay + case _ => delay } @@ -210,10 +241,7 @@ class ContainerProxy( } when(Pausing) { - case Event(ContainerPaused, data: WarmedData) => - context.parent ! NeedWork(data) - goto(Paused) - + case Event(ContainerPaused, data: WarmedData) => goto(Paused) case Event(_: FailureMessage, data: WarmedData) => destroyContainer(data.container) case _ => delay } @@ -284,6 +312,11 @@ class ContainerProxy( /** * Runs the job, initialize first if necessary. + * Completes the job by: + * 1. sending an activate ack, + * 2. fetching the logs for the run, + * 3. indicating the resource is free to the parent pool, + * 4. recording the result to the data store * * @param container the container to run the job on * @param job the job to run @@ -319,13 +352,18 @@ class ContainerProxy( }.recover { case InitializationError(interval, response) => ContainerProxy.constructWhiskActivation(job, interval, response) + case t => + // Actually, this should never happen - but we want to make sure to not miss a problem + logging.error(this, s"caught unexpected error while running activation: ${t}") + ContainerProxy.constructWhiskActivation(job, Interval.zero, ActivationResponse.whiskError(Messages.abnormalRun)) } // Sending active ack and storing the activation are concurrent side-effects // and do not block further execution of the future. They are completely // asynchronous. activation.andThen { - case Success(activation) => sendActiveAck(tid, activation) + // the activation future will always complete with Success + case Success(ack) => sendActiveAck(tid, ack) }.flatMap { activation => container.logs(job.action.limits.logs.asMegaBytes, job.action.exec.sentinelledLogs).map { logs => activation.withLogs(ActivationLogs(logs.toVector)) @@ -333,6 +371,7 @@ class ContainerProxy( }.andThen { case Success(activation) => storeActivation(tid, activation) }.flatMap { activation => + self ! ActivationCompleted // Fail the future iff the activation was unsuccessful to facilitate // better cleanup logic. if (activation.response.isSuccess) Future.successful(activation) @@ -353,7 +392,7 @@ object ContainerProxy { * Generates a unique container name. * * @param prefix the container name's prefix - * @param suffic the container name's suffix + * @param suffix the container name's suffix * @return a unique container name */ def containerName(prefix: String, suffix: String) = diff --git a/core/invoker/src/main/scala/whisk/core/dispatcher/ActivationFeed.scala b/core/invoker/src/main/scala/whisk/core/dispatcher/ActivationFeed.scala index b839713..716d688 100644 --- a/core/invoker/src/main/scala/whisk/core/dispatcher/ActivationFeed.scala +++ b/core/invoker/src/main/scala/whisk/core/dispatcher/ActivationFeed.scala @@ -90,8 +90,8 @@ protected class ActivationFeed( case (records, count) => records foreach { case (topic, partition, offset, bytes) => - logging.info(this, s"processing $topic[$partition][$offset ($count)]") pipelineOccupancy += 1 + logging.info(this, s"processing $topic[$partition][$offset ($count)][pipelineOccupancy=${pipelineOccupancy} (${pipelineFillThreshold})]") handler(topic, bytes) } } recover { @@ -101,8 +101,12 @@ protected class ActivationFeed( fill() } else logging.debug(this, "dropping fill request until feed is drained") - case _: ActivationNotification => + case n: ActivationNotification => pipelineOccupancy -= 1 + logging.info(this, s"received ActivationNotification: $n / pipelineOccupancy=$pipelineOccupancy / pipelineFillThreshold=$pipelineFillThreshold") + if (pipelineOccupancy < 0) { + logging.error(this, "pipelineOccupancy<0") + } fill() } diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala index f6ab973..1d07de9 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala @@ -42,11 +42,11 @@ import whisk.core.containerpool.Run import whisk.core.containerpool.docker.DockerClientWithFileAccess import whisk.core.containerpool.docker.DockerContainer import whisk.core.containerpool.docker.RuncClient +import whisk.core.dispatcher.ActivationFeed.FailedActivation import whisk.core.dispatcher.MessageHandler import whisk.core.entity._ import whisk.core.entity.ExecManifest.ImageName import whisk.core.entity.size._ -import whisk.core.dispatcher.ActivationFeed.ContainerReleased import whisk.core.containerpool.ContainerPool import whisk.core.database.NoDocumentException import whisk.http.Messages @@ -192,7 +192,7 @@ class InvokerReactive( Parameters("path", msg.action.toString.toJson) ++ causedBy }) - activationFeed ! ContainerReleased + activationFeed ! FailedActivation(msg.transid) ack(msg.transid, activation) store(msg.transid, activation) } diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala index 2513f9b..567fe6d 100644 --- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala @@ -106,12 +106,12 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool")) behavior of "ContainerPool" - it should "indicate free resources to the feed only if a warm container responds" in within(timeout) { + it should "indicate free resources to the feed once activations finish" in within(timeout) { val (containers, factory) = testContainers(1) val feed = TestProbe() val pool = system.actorOf(ContainerPool.props(factory, 0, feed.ref)) - containers(0).send(pool, NeedWork(warmedData())) + containers(0).send(pool, ActivationCompleted) feed.expectMsg(ContainerReleased) } @@ -156,6 +156,7 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool")) pool ! runMessage containers(0).expectMsg(runMessage) containers(0).send(pool, NeedWork(warmedData())) + containers(0).send(pool, ActivationCompleted) feed.expectMsg(ContainerReleased) pool ! runMessageDifferentEverything containers(0).expectMsg(Remove) @@ -171,6 +172,7 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool")) pool ! runMessage containers(0).expectMsg(runMessage) containers(0).send(pool, NeedWork(warmedData())) + containers(0).send(pool, ActivationCompleted) feed.expectMsg(ContainerReleased) pool ! runMessageDifferentNamespace containers(0).expectMsg(Remove) @@ -186,6 +188,7 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool")) pool ! runMessage containers(0).expectMsg(runMessage) containers(0).send(pool, NeedWork(warmedData())) + containers(0).send(pool, ActivationCompleted) feed.expectMsg(ContainerReleased) pool ! runMessage containers(0).expectMsg(runMessage) diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala index 2094756..0701ef6 100644 --- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala @@ -104,6 +104,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys")) def run(machine: ActorRef, currentState: ContainerState) = { machine ! Run(action, message) expectMsg(Transition(machine, currentState, Running)) + expectMsg(ActivationCompleted) expectWarmed(invocationNamespace.name, action) expectMsg(Transition(machine, Running, Ready)) } @@ -124,7 +125,6 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys")) /** Expect the container to pause successfully */ def expectPause(machine: ActorRef) = { expectMsg(Transition(machine, Ready, Pausing)) - expectWarmed(invocationNamespace.name, action) expectMsg(Transition(machine, Pausing, Paused)) } @@ -271,6 +271,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys")) registerCallback(machine) machine ! Run(action, message) expectMsg(Transition(machine, Uninitialized, Running)) + expectMsg(ActivationCompleted) expectMsg(ContainerRemoved) awaitAssert { @@ -299,6 +300,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys")) registerCallback(machine) machine ! Run(action, message) expectMsg(Transition(machine, Uninitialized, Running)) + expectMsg(ActivationCompleted) expectMsg(ContainerRemoved) // The message is sent as soon as the container decides to destroy itself expectMsg(Transition(machine, Running, Removing)) @@ -327,6 +329,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys")) registerCallback(machine) machine ! Run(action, message) expectMsg(Transition(machine, Uninitialized, Running)) + expectMsg(ActivationCompleted) expectMsg(ContainerRemoved) // The message is sent as soon as the container decides to destroy itself expectMsg(Transition(machine, Running, Removing)) @@ -426,6 +429,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys")) // Finish /init, note that /run and log-collecting happens nonetheless initPromise.success(Interval.zero) + expectMsg(ActivationCompleted) expectWarmed(invocationNamespace.name, action) expectMsg(Transition(machine, Running, Ready)) -- To stop receiving notification emails like this one, please contact ['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].