This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new b3fb4b7 Actor state safety improvements. (#2802) b3fb4b7 is described below commit b3fb4b78f1e120fac1c9891ae587d9157f1b4b7b Author: Brendan McAdams <brendan@bytes.codes> AuthorDate: Wed Nov 8 08:23:26 2017 -0800 Actor state safety improvements. (#2802) * Move MessageConsumer internal `outstandingMessages` Queue to immutable - There's a risk with a mutable collection in actor state of leaking outside the actor (and then mutated outside) if sent in a message accidentally - Instead, a mutable var (always protected by actor isolation) pointing to an immutable collection provides better actor safety as a best practice * Move InvokerPool Actor internal instance tracking collections to immutable from mutable - As these collection items were already marked as risky if closed over, this cleanup should obviate any risk of that leakage (including being messaged out of the stack) - Change to immutable maps required a small refactor of `PingMessage` receive branch * Minor style fix to use foreach instead of map where unit was being returned in the map * Obvious cleanup/optimization of my last revision of PingMessage branch * In ActivationFinisher Actor, replace mutable state collection with immutable * ContainerPool Actor immutability cleanup - migrate mutable free/busy/prewarmed pools to immutable maps - should prevent accidental state leakage via closures and messages * Spacing fix for style guidelines * Remove unnecessary usage of "new" to instantiate a case class --- .../whisk/core/connector/MessageConsumer.scala | 20 +++++++--- .../core/controller/actions/PrimitiveActions.scala | 9 ++--- .../core/loadBalancer/InvokerSupervision.scala | 45 +++++++++++++--------- .../whisk/core/containerpool/ContainerPool.scala | 45 ++++++++++++---------- 4 files changed, 69 insertions(+), 50 deletions(-) diff --git a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala index 31fcea0..5197767 100644 --- a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala +++ b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala @@ -18,14 +18,12 @@ package whisk.core.connector import scala.annotation.tailrec -import scala.collection.mutable +import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.blocking import scala.concurrent.duration._ import scala.util.Failure - import org.apache.kafka.clients.consumer.CommitFailedException - import akka.actor.FSM import akka.pattern.pipe import whisk.common.Logging @@ -111,7 +109,13 @@ class MessageFeed(description: String, consumer.maxPeek <= maxPipelineDepth, "consumer may not yield more messages per peek than permitted by max depth") - private val outstandingMessages = mutable.Queue[(String, Int, Long, Array[Byte])]() + // Immutable Queue + // although on the surface it seems to make sense to use an immutable variable with a mutable Queue, + // Akka Actor state defies the usual "prefer immutable" guideline in Scala, esp. w/ Collections. + // If, for some reason, this Queue was mutable and is accidentally leaked in say an Akka message, + // another Actor or recipient would be able to mutate the internal state of this Actor. + // Best practice dictates a mutable variable pointing at an immutable collection for this reason + private var outstandingMessages = immutable.Queue.empty[(String, Int, Long, Array[Byte])] private var handlerCapacity = maximumHandlerCapacity private implicit val tid = TransactionId.dispatcher @@ -137,7 +141,7 @@ class MessageFeed(description: String, stay case Event(FillCompleted(messages), _) => - outstandingMessages.enqueue(messages: _*) + outstandingMessages = outstandingMessages ++ messages sendOutstandingMessages() if (shouldFillQueue()) { @@ -202,7 +206,11 @@ class MessageFeed(description: String, private def sendOutstandingMessages(): Unit = { val occupancy = outstandingMessages.size if (occupancy > 0 && handlerCapacity > 0) { - val (topic, partition, offset, bytes) = outstandingMessages.dequeue() + // Easiest way with an immutable queue to cleanly dequeue + // Head is the first elemeent of the queue, desugared w/ an assignment pattern + // Tail is everything but the first element, thus mutating the collection variable + val (topic, partition, offset, bytes) = outstandingMessages.head + outstandingMessages = outstandingMessages.tail if (logHandoff) logging.info(this, s"processing $topic[$partition][$offset] ($occupancy/$handlerCapacity)") handler(bytes) diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala index f159192..2a9fa1f 100644 --- a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala +++ b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala @@ -17,7 +17,6 @@ package whisk.core.controller.actions -import scala.collection.mutable.Buffer import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.Promise @@ -258,7 +257,7 @@ protected[actions] object ActivationFinisher { // when the future completes, self-destruct promise.future.andThen { case _ => shutdown() } - val preemptiveMsgs: Buffer[Cancellable] = Buffer.empty + var preemptiveMsgs = Vector.empty[Cancellable] def receive = { case ActivationFinisher.Finish(activation) => @@ -267,13 +266,13 @@ protected[actions] object ActivationFinisher { case msg @ Scheduler.WorkOnceNow => // try up to three times when pre-emptying the schedule fastPollPeriods.foreach { s => - preemptiveMsgs += context.system.scheduler.scheduleOnce(s, poller, msg) + preemptiveMsgs = preemptiveMsgs :+ context.system.scheduler.scheduleOnce(s, poller, msg) } } def shutdown(): Unit = { preemptiveMsgs.foreach(_.cancel()) - preemptiveMsgs.clear() + preemptiveMsgs = Vector.empty context.stop(poller) context.stop(self) } @@ -281,7 +280,7 @@ protected[actions] object ActivationFinisher { override def postStop() = { logging.info(this, "finisher shutdown") preemptiveMsgs.foreach(_.cancel()) - preemptiveMsgs.clear() + preemptiveMsgs = Vector.empty context.stop(poller) } } diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala index 13517de..d188ce4 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala @@ -19,7 +19,7 @@ package whisk.core.loadBalancer import java.nio.charset.StandardCharsets -import scala.collection.mutable +import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Failure @@ -82,33 +82,24 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef, implicit val timeout = Timeout(5.seconds) implicit val ec = context.dispatcher - // State of the actor. It's important not to close over these - // references directly, so they don't escape the Actor. - val instanceToRef = mutable.Map[InstanceId, ActorRef]() - val refToInstance = mutable.Map[ActorRef, InstanceId]() + // State of the actor. Mutable vars with immutable collections prevents closures or messages + // from leaking the state for external mutation + var instanceToRef = immutable.Map.empty[InstanceId, ActorRef] + var refToInstance = immutable.Map.empty[ActorRef, InstanceId] var status = IndexedSeq[(InstanceId, InvokerState)]() def receive = { case p: PingMessage => - val invoker = instanceToRef.getOrElseUpdate(p.instance, { - logging.info(this, s"registered a new invoker: invoker${p.instance.toInt}")(TransactionId.invokerHealth) + val invoker = instanceToRef.getOrElse(p.instance, registerInvoker(p.instance)) + instanceToRef = instanceToRef.updated(p.instance, invoker) - status = padToIndexed(status, p.instance.toInt + 1, i => (InstanceId(i), Offline)) - - val ref = childFactory(context, p.instance) - ref ! SubscribeTransitionCallBack(self) // register for state change events - - refToInstance.update(ref, p.instance) - ref - }) invoker.forward(p) case GetStatus => sender() ! status - case msg: InvocationFinishedMessage => { + case msg: InvocationFinishedMessage => // Forward message to invoker, if InvokerActor exists - instanceToRef.get(msg.invokerInstance).map(_.forward(msg)) - } + instanceToRef.get(msg.invokerInstance).foreach(_.forward(msg)) case CurrentState(invoker, currentState: InvokerState) => refToInstance.get(invoker).foreach { instance => @@ -159,6 +150,22 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef, /** Pads a list to a given length using the given function to compute entries */ def padToIndexed[A](list: IndexedSeq[A], n: Int, f: (Int) => A) = list ++ (list.size until n).map(f) + + // Register a new invoker + def registerInvoker(instanceId: InstanceId): ActorRef = { + logging.info(this, s"registered a new invoker: invoker${instanceId.toInt}")(TransactionId.invokerHealth) + + status = padToIndexed(status, instanceId.toInt + 1, i => (InstanceId(i), Offline)) + + val ref = childFactory(context, instanceId) + + ref ! SubscribeTransitionCallBack(self) // register for state change events + + refToInstance = refToInstance.updated(ref, instanceId) + + ref + } + } object InvokerPool { @@ -179,7 +186,7 @@ object InvokerPool { new WhiskAction( namespace = healthActionIdentity.namespace.toPath, name = EntityName(s"invokerHealthTestAction${i.toInt}"), - exec = new CodeExecAsString(manifest, """function main(params) { return params; }""", None)) + exec = CodeExecAsString(manifest, """function main(params) { return params; }""", None)) } } diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala index 22b2b12..b02f528 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala @@ -17,7 +17,7 @@ package whisk.core.containerpool -import scala.collection.mutable +import scala.collection.immutable import akka.actor.Actor import akka.actor.ActorRef @@ -67,9 +67,9 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, extends Actor { implicit val logging = new AkkaLogging(context.system.log) - val freePool = mutable.Map[ActorRef, ContainerData]() - val busyPool = mutable.Map[ActorRef, ContainerData]() - val prewarmedPool = mutable.Map[ActorRef, ContainerData]() + var freePool = immutable.Map.empty[ActorRef, ContainerData] + var busyPool = immutable.Map.empty[ActorRef, ContainerData] + var prewarmedPool = immutable.Map.empty[ActorRef, ContainerData] prewarmConfig.foreach { config => logging.info(this, s"pre-warming ${config.count} ${config.exec.kind} containers") @@ -84,7 +84,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, val container = if (busyPool.size < maxActiveContainers) { // Schedule a job to a warm container ContainerPool - .schedule(r.action, r.msg.user.namespace, freePool.toMap) + .schedule(r.action, r.msg.user.namespace, freePool) .orElse { if (busyPool.size + freePool.size < maxPoolSize) { takePrewarmContainer(r.action).orElse { @@ -94,7 +94,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, } .orElse { // Remove a container and create a new one for the given job - ContainerPool.remove(r.action, r.msg.user.namespace, freePool.toMap).map { toDelete => + ContainerPool.remove(r.action, r.msg.user.namespace, freePool).map { toDelete => removeContainer(toDelete) takePrewarmContainer(r.action).getOrElse { createContainer() @@ -105,8 +105,8 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, container match { case Some((actor, data)) => - busyPool.update(actor, data) - freePool.remove(actor) + busyPool = busyPool + (actor -> data) + freePool = freePool - actor actor ! r // forwards the run request to the container case None => logging.error(this, "Rescheduling Run message, too many message in the pool")(r.msg.transid) @@ -115,26 +115,31 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, // Container is free to take more work case NeedWork(data: WarmedData) => - freePool.update(sender(), data) - busyPool.remove(sender()).foreach(_ => feed ! MessageFeed.Processed) + freePool = freePool + (sender() -> data) + busyPool.get(sender()).foreach { _ => + busyPool = busyPool - sender() + feed ! MessageFeed.Processed + } // Container is prewarmed and ready to take work case NeedWork(data: PreWarmedData) => - prewarmedPool.update(sender(), data) + prewarmedPool = prewarmedPool + (sender() -> data) // Container got removed case ContainerRemoved => - freePool.remove(sender()) - busyPool.remove(sender()).foreach(_ => feed ! MessageFeed.Processed) + freePool = freePool - sender() + busyPool.get(sender()).foreach { _ => + busyPool = busyPool - sender() + feed ! MessageFeed.Processed + } } /** Creates a new container and updates state accordingly. */ def createContainer(): (ActorRef, ContainerData) = { val ref = childFactory(context) val data = NoData() - freePool.update(ref, data) - - (ref, data) + freePool = freePool + (ref -> data) + ref -> data } /** Creates a new prewarmed container */ @@ -160,8 +165,8 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, .map { case (ref, data) => // Move the container to the usual pool - freePool.update(ref, data) - prewarmedPool.remove(ref) + freePool = freePool + (ref -> data) + prewarmedPool = prewarmedPool - ref // Create a new prewarm container prewarmContainer(config.exec, config.memoryLimit) @@ -172,8 +177,8 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, /** Removes a container and updates state accordingly. */ def removeContainer(toDelete: ActorRef) = { toDelete ! Remove - freePool.remove(toDelete) - busyPool.remove(toDelete) + freePool = freePool - toDelete + busyPool = busyPool - toDelete } } -- To stop receiving notification emails like this one, please contact ['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].