This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new b3fb4b7  Actor state safety improvements. (#2802)
b3fb4b7 is described below

commit b3fb4b78f1e120fac1c9891ae587d9157f1b4b7b
Author: Brendan McAdams <brendan@bytes.codes>
AuthorDate: Wed Nov 8 08:23:26 2017 -0800

    Actor state safety improvements. (#2802)
    
    * Move MessageConsumer internal `outstandingMessages` Queue to immutable
    - There's a risk with a mutable collection in actor state of leaking
      outside the actor (and then mutated outside) if sent in
      a message accidentally
    - Instead, a mutable var (always protected by actor isolation) pointing
      to an immutable collection provides better actor safety as a
      best practice
    
    * Move InvokerPool Actor internal instance tracking collections to 
immutable from mutable
      - As these collection items were already marked as risky if closed over, 
this cleanup
        should obviate any risk of that leakage (including being messaged out 
of the stack)
      - Change to immutable maps required a small refactor of `PingMessage` 
receive branch
    
    * Minor style fix to use foreach instead of map where unit was being 
returned in the map
    
    * Obvious cleanup/optimization of my last revision of PingMessage branch
    
    * In ActivationFinisher Actor, replace mutable state collection with 
immutable
    
    * ContainerPool Actor immutability cleanup
      - migrate mutable free/busy/prewarmed pools to immutable maps
      - should prevent accidental state leakage via closures and messages
    
    * Spacing fix for style guidelines
    
    * Remove unnecessary usage of "new" to instantiate a case class
---
 .../whisk/core/connector/MessageConsumer.scala     | 20 +++++++---
 .../core/controller/actions/PrimitiveActions.scala |  9 ++---
 .../core/loadBalancer/InvokerSupervision.scala     | 45 +++++++++++++---------
 .../whisk/core/containerpool/ContainerPool.scala   | 45 ++++++++++++----------
 4 files changed, 69 insertions(+), 50 deletions(-)

diff --git 
a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala 
b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
index 31fcea0..5197767 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
@@ -18,14 +18,12 @@
 package whisk.core.connector
 
 import scala.annotation.tailrec
-import scala.collection.mutable
+import scala.collection.immutable
 import scala.concurrent.Future
 import scala.concurrent.blocking
 import scala.concurrent.duration._
 import scala.util.Failure
-
 import org.apache.kafka.clients.consumer.CommitFailedException
-
 import akka.actor.FSM
 import akka.pattern.pipe
 import whisk.common.Logging
@@ -111,7 +109,13 @@ class MessageFeed(description: String,
     consumer.maxPeek <= maxPipelineDepth,
     "consumer may not yield more messages per peek than permitted by max 
depth")
 
-  private val outstandingMessages = mutable.Queue[(String, Int, Long, 
Array[Byte])]()
+  // Immutable Queue
+  // although on the surface it seems to make sense to use an immutable 
variable with a mutable Queue,
+  // Akka Actor state defies the usual "prefer immutable" guideline in Scala, 
esp. w/ Collections.
+  // If, for some reason, this Queue was mutable and is accidentally leaked in 
say an Akka message,
+  // another Actor or recipient would be able to mutate the internal state of 
this Actor.
+  // Best practice dictates a mutable variable pointing at an immutable 
collection for this reason
+  private var outstandingMessages = immutable.Queue.empty[(String, Int, Long, 
Array[Byte])]
   private var handlerCapacity = maximumHandlerCapacity
 
   private implicit val tid = TransactionId.dispatcher
@@ -137,7 +141,7 @@ class MessageFeed(description: String,
       stay
 
     case Event(FillCompleted(messages), _) =>
-      outstandingMessages.enqueue(messages: _*)
+      outstandingMessages = outstandingMessages ++ messages
       sendOutstandingMessages()
 
       if (shouldFillQueue()) {
@@ -202,7 +206,11 @@ class MessageFeed(description: String,
   private def sendOutstandingMessages(): Unit = {
     val occupancy = outstandingMessages.size
     if (occupancy > 0 && handlerCapacity > 0) {
-      val (topic, partition, offset, bytes) = outstandingMessages.dequeue()
+      // Easiest way with an immutable queue to cleanly dequeue
+      // Head is the first elemeent of the queue, desugared w/ an assignment 
pattern
+      // Tail is everything but the first element, thus mutating the 
collection variable
+      val (topic, partition, offset, bytes) = outstandingMessages.head
+      outstandingMessages = outstandingMessages.tail
 
       if (logHandoff) logging.info(this, s"processing 
$topic[$partition][$offset] ($occupancy/$handlerCapacity)")
       handler(bytes)
diff --git 
a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
 
b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
index f159192..2a9fa1f 100644
--- 
a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
+++ 
b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
@@ -17,7 +17,6 @@
 
 package whisk.core.controller.actions
 
-import scala.collection.mutable.Buffer
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.Promise
@@ -258,7 +257,7 @@ protected[actions] object ActivationFinisher {
     // when the future completes, self-destruct
     promise.future.andThen { case _ => shutdown() }
 
-    val preemptiveMsgs: Buffer[Cancellable] = Buffer.empty
+    var preemptiveMsgs = Vector.empty[Cancellable]
 
     def receive = {
       case ActivationFinisher.Finish(activation) =>
@@ -267,13 +266,13 @@ protected[actions] object ActivationFinisher {
       case msg @ Scheduler.WorkOnceNow =>
         // try up to three times when pre-emptying the schedule
         fastPollPeriods.foreach { s =>
-          preemptiveMsgs += context.system.scheduler.scheduleOnce(s, poller, 
msg)
+          preemptiveMsgs = preemptiveMsgs :+ 
context.system.scheduler.scheduleOnce(s, poller, msg)
         }
     }
 
     def shutdown(): Unit = {
       preemptiveMsgs.foreach(_.cancel())
-      preemptiveMsgs.clear()
+      preemptiveMsgs = Vector.empty
       context.stop(poller)
       context.stop(self)
     }
@@ -281,7 +280,7 @@ protected[actions] object ActivationFinisher {
     override def postStop() = {
       logging.info(this, "finisher shutdown")
       preemptiveMsgs.foreach(_.cancel())
-      preemptiveMsgs.clear()
+      preemptiveMsgs = Vector.empty
       context.stop(poller)
     }
   }
diff --git 
a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
 
b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
index 13517de..d188ce4 100644
--- 
a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
+++ 
b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
@@ -19,7 +19,7 @@ package whisk.core.loadBalancer
 
 import java.nio.charset.StandardCharsets
 
-import scala.collection.mutable
+import scala.collection.immutable
 import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.util.Failure
@@ -82,33 +82,24 @@ class InvokerPool(childFactory: (ActorRefFactory, 
InstanceId) => ActorRef,
   implicit val timeout = Timeout(5.seconds)
   implicit val ec = context.dispatcher
 
-  // State of the actor. It's important not to close over these
-  // references directly, so they don't escape the Actor.
-  val instanceToRef = mutable.Map[InstanceId, ActorRef]()
-  val refToInstance = mutable.Map[ActorRef, InstanceId]()
+  // State of the actor. Mutable vars with immutable collections prevents 
closures or messages
+  // from leaking the state for external mutation
+  var instanceToRef = immutable.Map.empty[InstanceId, ActorRef]
+  var refToInstance = immutable.Map.empty[ActorRef, InstanceId]
   var status = IndexedSeq[(InstanceId, InvokerState)]()
 
   def receive = {
     case p: PingMessage =>
-      val invoker = instanceToRef.getOrElseUpdate(p.instance, {
-        logging.info(this, s"registered a new invoker: 
invoker${p.instance.toInt}")(TransactionId.invokerHealth)
+      val invoker = instanceToRef.getOrElse(p.instance, 
registerInvoker(p.instance))
+      instanceToRef = instanceToRef.updated(p.instance, invoker)
 
-        status = padToIndexed(status, p.instance.toInt + 1, i => 
(InstanceId(i), Offline))
-
-        val ref = childFactory(context, p.instance)
-        ref ! SubscribeTransitionCallBack(self) // register for state change 
events
-
-        refToInstance.update(ref, p.instance)
-        ref
-      })
       invoker.forward(p)
 
     case GetStatus => sender() ! status
 
-    case msg: InvocationFinishedMessage => {
+    case msg: InvocationFinishedMessage =>
       // Forward message to invoker, if InvokerActor exists
-      instanceToRef.get(msg.invokerInstance).map(_.forward(msg))
-    }
+      instanceToRef.get(msg.invokerInstance).foreach(_.forward(msg))
 
     case CurrentState(invoker, currentState: InvokerState) =>
       refToInstance.get(invoker).foreach { instance =>
@@ -159,6 +150,22 @@ class InvokerPool(childFactory: (ActorRefFactory, 
InstanceId) => ActorRef,
 
   /** Pads a list to a given length using the given function to compute 
entries */
   def padToIndexed[A](list: IndexedSeq[A], n: Int, f: (Int) => A) = list ++ 
(list.size until n).map(f)
+
+  // Register a new invoker
+  def registerInvoker(instanceId: InstanceId): ActorRef = {
+    logging.info(this, s"registered a new invoker: 
invoker${instanceId.toInt}")(TransactionId.invokerHealth)
+
+    status = padToIndexed(status, instanceId.toInt + 1, i => (InstanceId(i), 
Offline))
+
+    val ref = childFactory(context, instanceId)
+
+    ref ! SubscribeTransitionCallBack(self) // register for state change events
+
+    refToInstance = refToInstance.updated(ref, instanceId)
+
+    ref
+  }
+
 }
 
 object InvokerPool {
@@ -179,7 +186,7 @@ object InvokerPool {
     new WhiskAction(
       namespace = healthActionIdentity.namespace.toPath,
       name = EntityName(s"invokerHealthTestAction${i.toInt}"),
-      exec = new CodeExecAsString(manifest, """function main(params) { return 
params; }""", None))
+      exec = CodeExecAsString(manifest, """function main(params) { return 
params; }""", None))
   }
 }
 
diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala 
b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
index 22b2b12..b02f528 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
@@ -17,7 +17,7 @@
 
 package whisk.core.containerpool
 
-import scala.collection.mutable
+import scala.collection.immutable
 
 import akka.actor.Actor
 import akka.actor.ActorRef
@@ -67,9 +67,9 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
     extends Actor {
   implicit val logging = new AkkaLogging(context.system.log)
 
-  val freePool = mutable.Map[ActorRef, ContainerData]()
-  val busyPool = mutable.Map[ActorRef, ContainerData]()
-  val prewarmedPool = mutable.Map[ActorRef, ContainerData]()
+  var freePool = immutable.Map.empty[ActorRef, ContainerData]
+  var busyPool = immutable.Map.empty[ActorRef, ContainerData]
+  var prewarmedPool = immutable.Map.empty[ActorRef, ContainerData]
 
   prewarmConfig.foreach { config =>
     logging.info(this, s"pre-warming ${config.count} ${config.exec.kind} 
containers")
@@ -84,7 +84,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
       val container = if (busyPool.size < maxActiveContainers) {
         // Schedule a job to a warm container
         ContainerPool
-          .schedule(r.action, r.msg.user.namespace, freePool.toMap)
+          .schedule(r.action, r.msg.user.namespace, freePool)
           .orElse {
             if (busyPool.size + freePool.size < maxPoolSize) {
               takePrewarmContainer(r.action).orElse {
@@ -94,7 +94,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
           }
           .orElse {
             // Remove a container and create a new one for the given job
-            ContainerPool.remove(r.action, r.msg.user.namespace, 
freePool.toMap).map { toDelete =>
+            ContainerPool.remove(r.action, r.msg.user.namespace, freePool).map 
{ toDelete =>
               removeContainer(toDelete)
               takePrewarmContainer(r.action).getOrElse {
                 createContainer()
@@ -105,8 +105,8 @@ class ContainerPool(childFactory: ActorRefFactory => 
ActorRef,
 
       container match {
         case Some((actor, data)) =>
-          busyPool.update(actor, data)
-          freePool.remove(actor)
+          busyPool = busyPool + (actor -> data)
+          freePool = freePool - actor
           actor ! r // forwards the run request to the container
         case None =>
           logging.error(this, "Rescheduling Run message, too many message in 
the pool")(r.msg.transid)
@@ -115,26 +115,31 @@ class ContainerPool(childFactory: ActorRefFactory => 
ActorRef,
 
     // Container is free to take more work
     case NeedWork(data: WarmedData) =>
-      freePool.update(sender(), data)
-      busyPool.remove(sender()).foreach(_ => feed ! MessageFeed.Processed)
+      freePool = freePool + (sender() -> data)
+      busyPool.get(sender()).foreach { _ =>
+        busyPool = busyPool - sender()
+        feed ! MessageFeed.Processed
+      }
 
     // Container is prewarmed and ready to take work
     case NeedWork(data: PreWarmedData) =>
-      prewarmedPool.update(sender(), data)
+      prewarmedPool = prewarmedPool + (sender() -> data)
 
     // Container got removed
     case ContainerRemoved =>
-      freePool.remove(sender())
-      busyPool.remove(sender()).foreach(_ => feed ! MessageFeed.Processed)
+      freePool = freePool - sender()
+      busyPool.get(sender()).foreach { _ =>
+        busyPool = busyPool - sender()
+        feed ! MessageFeed.Processed
+      }
   }
 
   /** Creates a new container and updates state accordingly. */
   def createContainer(): (ActorRef, ContainerData) = {
     val ref = childFactory(context)
     val data = NoData()
-    freePool.update(ref, data)
-
-    (ref, data)
+    freePool = freePool + (ref -> data)
+    ref -> data
   }
 
   /** Creates a new prewarmed container */
@@ -160,8 +165,8 @@ class ContainerPool(childFactory: ActorRefFactory => 
ActorRef,
         .map {
           case (ref, data) =>
             // Move the container to the usual pool
-            freePool.update(ref, data)
-            prewarmedPool.remove(ref)
+            freePool = freePool + (ref -> data)
+            prewarmedPool = prewarmedPool - ref
             // Create a new prewarm container
             prewarmContainer(config.exec, config.memoryLimit)
 
@@ -172,8 +177,8 @@ class ContainerPool(childFactory: ActorRefFactory => 
ActorRef,
   /** Removes a container and updates state accordingly. */
   def removeContainer(toDelete: ActorRef) = {
     toDelete ! Remove
-    freePool.remove(toDelete)
-    busyPool.remove(toDelete)
+    freePool = freePool - toDelete
+    busyPool = busyPool - toDelete
   }
 }
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].

Reply via email to