markusthoemmes closed pull request #2932: refactoring loadbalacing service with 
an overflow queue
URL: https://github.com/apache/incubator-openwhisk/pull/2932
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/ansible/environments/docker-machine/group_vars/all 
b/ansible/environments/docker-machine/group_vars/all
index 20f7228c94..21c5e75684 100644
--- a/ansible/environments/docker-machine/group_vars/all
+++ b/ansible/environments/docker-machine/group_vars/all
@@ -29,3 +29,9 @@ controller_arguments: '-Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxre
 invoker_arguments: "{{ controller_arguments }}"
 
 invoker_allow_multiple_instances: true
+
+
+limit_invocations_per_minute: 60000
+limit_invocations_concurrent: 30000
+limit_invocations_concurrent_system: 50000
+limit_fires_per_minute: 60
\ No newline at end of file
diff --git a/ansible/environments/local/group_vars/all 
b/ansible/environments/local/group_vars/all
index d99d85eb23..8dbbe14082 100755
--- a/ansible/environments/local/group_vars/all
+++ b/ansible/environments/local/group_vars/all
@@ -23,3 +23,8 @@ controller_arguments: '-Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxre
 invoker_arguments: "{{ controller_arguments }}"
 
 invoker_allow_multiple_instances: true
+
+limit_invocations_per_minute: 60000
+limit_invocations_concurrent: 30000
+limit_invocations_concurrent_system: 50000
+limit_fires_per_minute: 60
\ No newline at end of file
diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala 
b/common/scala/src/main/scala/whisk/core/connector/Message.scala
index 3059f6b051..742397f53d 100644
--- a/common/scala/src/main/scala/whisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala
@@ -18,7 +18,6 @@
 package whisk.core.connector
 
 import scala.util.Try
-
 import spray.json._
 import whisk.common.TransactionId
 import whisk.core.entity.ActivationId
@@ -118,3 +117,28 @@ object PingMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(serdes.read(msg.parseJson))
   implicit val serdes = jsonFormat(PingMessage.apply _, "name")
 }
+
+case class OverflowMessage(override val transid: TransactionId,
+                           msg: ActivationMessage,
+                           actionTimeoutSeconds: Int,
+                           hash: Int,
+                           pull: Boolean,
+                           originalController: InstanceId)
+    extends Message {
+//  def meta =
+//    JsObject("meta" -> {
+//      cause map { c =>
+//        JsObject(c.toJsObject.fields ++ msg.toJsObject.fields)
+//      } getOrElse {
+//        activationId.toJsObject
+//      }
+//    })
+  override def serialize: String = {
+    OverflowMessage.serdes.write(this).compactPrint
+  }
+}
+
+object OverflowMessage extends DefaultJsonProtocol {
+  def parse(msg: String): Try[OverflowMessage] = 
Try(serdes.read(msg.parseJson))
+  implicit val serdes = jsonFormat6(OverflowMessage.apply)
+}
diff --git 
a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala 
b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
index 31fcea09ba..ff1208bfb6 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
@@ -17,15 +17,14 @@
 
 package whisk.core.connector
 
+import akka.actor.ActorRef
 import scala.annotation.tailrec
 import scala.collection.mutable
 import scala.concurrent.Future
 import scala.concurrent.blocking
 import scala.concurrent.duration._
 import scala.util.Failure
-
 import org.apache.kafka.clients.consumer.CommitFailedException
-
 import akka.actor.FSM
 import akka.pattern.pipe
 import whisk.common.Logging
@@ -72,6 +71,9 @@ object MessageFeed {
   /** Steady state message, indicates capacity in downstream process to 
receive more messages. */
   object Processed
 
+  /** message to indicate max offset is reached */
+  object MaxOffset
+
   /** Indicates the fill operation has completed. */
   private case class FillCompleted(messages: Seq[(String, Int, Long, 
Array[Byte])])
 }
@@ -99,7 +101,8 @@ class MessageFeed(description: String,
                   longPollDuration: FiniteDuration,
                   handler: Array[Byte] => Future[Unit],
                   autoStart: Boolean = true,
-                  logHandoff: Boolean = true)
+                  logHandoff: Boolean = true,
+                  offsetMonitor: Option[ActorRef] = None)
     extends FSM[MessageFeed.FeedState, MessageFeed.FeedData] {
   import MessageFeed._
 
@@ -181,6 +184,10 @@ class MessageFeed(description: String,
           // of the commit should be masked.
           val records = consumer.peek(longPollDuration)
           consumer.commit()
+          if (records.size < maxPipelineDepth) {
+            //reached the max offset
+            offsetMonitor.foreach(_ ! MaxOffset)
+          }
           FillCompleted(records.toSeq)
         }
       }.andThen {
diff --git a/common/scala/src/main/scala/whisk/spi/SpiLoader.scala 
b/common/scala/src/main/scala/whisk/spi/SpiLoader.scala
index 6aa0f6e884..44592a445a 100644
--- a/common/scala/src/main/scala/whisk/spi/SpiLoader.scala
+++ b/common/scala/src/main/scala/whisk/spi/SpiLoader.scala
@@ -44,7 +44,8 @@ object SpiLoader {
 
 /** Lookup the classname for the SPI impl based on a key in the provided 
Config */
 object TypesafeConfigClassResolver extends SpiClassResolver {
-  private val config = ConfigFactory.load()
+  //allow tests to inject a config
+  var config = ConfigFactory.load()
 
   override def getClassNameForType[T: Manifest]: String =
     config.getString("whisk.spi." + manifest[T].runtimeClass.getSimpleName)
diff --git 
a/core/controller/src/main/scala/whisk/core/controller/Controller.scala 
b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index bb723bc627..285716d81c 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -23,6 +23,7 @@ import scala.concurrent.Future
 import scala.util.{Failure, Success}
 import akka.Done
 import akka.actor.ActorSystem
+import akka.cluster.Cluster
 import akka.actor.CoordinatedShutdown
 import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
 import akka.http.scaladsl.model.Uri
@@ -39,13 +40,18 @@ import whisk.common.Logging
 import whisk.common.LoggingMarkers
 import whisk.common.TransactionId
 import whisk.core.WhiskConfig
+import whisk.core.connector.MessagingProvider
 import whisk.core.database.RemoteCacheInvalidation
 import whisk.core.database.CacheChangeNotification
 import whisk.core.entitlement._
 import whisk.core.entity._
 import whisk.core.entity.ActivationId.ActivationIdGenerator
 import whisk.core.entity.ExecManifest.Runtimes
-import whisk.core.loadBalancer.{LoadBalancerService}
+import whisk.core.loadBalancer.DistributedLoadBalancerData
+import whisk.core.loadBalancer.LoadBalancerActorService
+import whisk.core.loadBalancer.LoadBalancerService
+import whisk.core.loadBalancer.LocalLoadBalancerData
+import whisk.core.loadBalancer.StaticSeedNodesProvider
 import whisk.http.BasicHttpService
 import whisk.http.BasicRasService
 import whisk.spi.SpiLoader
@@ -114,7 +120,30 @@ class Controller(val instance: InstanceId,
   })
 
   // initialize backend services
-  private implicit val loadBalancer = new LoadBalancerService(whiskConfig, 
instance, entityStore)
+
+  /** Feature switch for shared load balancer data **/
+  private val loadBalancerData = {
+    if (whiskConfig.controllerLocalBookkeeping) {
+      new LocalLoadBalancerData()
+    } else {
+
+      /** Specify how seed nodes are generated */
+      val seedNodesProvider = new 
StaticSeedNodesProvider(whiskConfig.controllerSeedNodes, actorSystem.name)
+      Cluster(actorSystem).joinSeedNodes(seedNodesProvider.getSeedNodes())
+      new DistributedLoadBalancerData(instance)
+    }
+  }
+  private val messagingProvider = SpiLoader.get[MessagingProvider]
+  val maxPingsPerPoll = 128
+  val pingConsumer =
+    messagingProvider.getConsumer(whiskConfig, s"health${instance.toInt}", 
"health", maxPeek = maxPingsPerPoll)
+  private val messageProducer = messagingProvider.getProducer(whiskConfig, 
actorSystem.dispatcher)
+  private implicit val loadBalancer = new LoadBalancerActorService(
+    whiskConfig,
+    instance,
+    LoadBalancerService
+      .createInvokerPool(instance, actorSystem, actorSystem.dispatcher, 
entityStore, messageProducer, pingConsumer),
+    loadBalancerData)
   private implicit val entitlementProvider = new 
LocalEntitlementProvider(whiskConfig, loadBalancer)
   private implicit val activationIdFactory = new ActivationIdGenerator {}
   private implicit val logStore = 
SpiLoader.get[LogStoreProvider].logStore(actorSystem)
diff --git 
a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala 
b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
index 8015f60d91..84e5c5517a 100644
--- a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
@@ -18,7 +18,6 @@
 package whisk.core.controller
 
 import scala.concurrent.ExecutionContext
-
 import akka.actor.ActorSystem
 import akka.http.scaladsl.model.StatusCodes._
 import akka.http.scaladsl.model.Uri
@@ -27,7 +26,6 @@ import akka.http.scaladsl.server.Route
 import akka.http.scaladsl.model.headers._
 import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
 import akka.stream.ActorMaterializer
-
 import spray.json._
 import spray.json.DefaultJsonProtocol._
 import whisk.core.database.CacheChangeNotification
@@ -42,7 +40,7 @@ import whisk.core.entity._
 import whisk.core.entity.ActivationId.ActivationIdGenerator
 import whisk.core.entity.WhiskAuthStore
 import whisk.core.entity.types._
-import whisk.core.loadBalancer.LoadBalancerService
+import whisk.core.loadBalancer.LoadBalancer
 
 /**
  * Abstract class which provides basic Directives which are used to construct 
route structures
@@ -140,7 +138,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, 
apiVersion: String)(
   implicit val entityStore: EntityStore,
   implicit val entitlementProvider: EntitlementProvider,
   implicit val activationIdFactory: ActivationIdGenerator,
-  implicit val loadBalancer: LoadBalancerService,
+  implicit val loadBalancer: LoadBalancer,
   implicit val cacheChangeNotification: Some[CacheChangeNotification],
   implicit val activationStore: ActivationStore,
   implicit val logStore: LogStore,
@@ -222,7 +220,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, 
apiVersion: String)(
     override val activationStore: ActivationStore,
     override val entitlementProvider: EntitlementProvider,
     override val activationIdFactory: ActivationIdGenerator,
-    override val loadBalancer: LoadBalancerService,
+    override val loadBalancer: LoadBalancer,
     override val cacheChangeNotification: Some[CacheChangeNotification],
     override val executionContext: ExecutionContext,
     override val logging: Logging,
@@ -245,7 +243,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, 
apiVersion: String)(
     implicit override val entityStore: EntityStore,
     override val entitlementProvider: EntitlementProvider,
     override val activationIdFactory: ActivationIdGenerator,
-    override val loadBalancer: LoadBalancerService,
+    override val loadBalancer: LoadBalancer,
     override val cacheChangeNotification: Some[CacheChangeNotification],
     override val executionContext: ExecutionContext,
     override val logging: Logging,
@@ -258,7 +256,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, 
apiVersion: String)(
     override val entityStore: EntityStore,
     override val entitlementProvider: EntitlementProvider,
     override val activationIdFactory: ActivationIdGenerator,
-    override val loadBalancer: LoadBalancerService,
+    override val loadBalancer: LoadBalancer,
     override val cacheChangeNotification: Some[CacheChangeNotification],
     override val executionContext: ExecutionContext,
     override val logging: Logging,
@@ -272,7 +270,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, 
apiVersion: String)(
     override val entitlementProvider: EntitlementProvider,
     override val activationStore: ActivationStore,
     override val activationIdFactory: ActivationIdGenerator,
-    override val loadBalancer: LoadBalancerService,
+    override val loadBalancer: LoadBalancer,
     override val cacheChangeNotification: Some[CacheChangeNotification],
     override val executionContext: ExecutionContext,
     override val logging: Logging,
@@ -289,7 +287,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, 
apiVersion: String)(
     override val activationStore: ActivationStore,
     override val entitlementProvider: EntitlementProvider,
     override val activationIdFactory: ActivationIdGenerator,
-    override val loadBalancer: LoadBalancerService,
+    override val loadBalancer: LoadBalancer,
     override val actorSystem: ActorSystem,
     override val executionContext: ExecutionContext,
     override val logging: Logging,
diff --git 
a/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala
 
b/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala
index 34b5d6708f..e2257eed87 100644
--- 
a/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala
+++ 
b/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala
@@ -17,15 +17,17 @@
 
 package whisk.core.loadBalancer
 
+import akka.actor.Actor
+import akka.actor.ActorRef
 import akka.actor.ActorSystem
+import akka.actor.Props
 import akka.util.Timeout
-import akka.pattern.ask
-import whisk.common.Logging
-import whisk.core.entity.{ActivationId, UUID}
-
-import scala.collection.concurrent.TrieMap
-import scala.concurrent.Future
+import scala.collection.mutable
 import scala.concurrent.duration._
+import whisk.common.Logging
+import whisk.core.entity.ActivationId
+import whisk.core.entity.InstanceId
+import whisk.core.entity.UUID
 
 /**
  * Encapsulates data used for loadbalancer and active-ack bookkeeping.
@@ -33,53 +35,99 @@ import scala.concurrent.duration._
  * Note: The state keeping is backed by distributed akka actors. All CRUDs 
operations are done on local values, thus
  * a stale value might be read.
  */
-class DistributedLoadBalancerData(implicit actorSystem: ActorSystem, logging: 
Logging) extends LoadBalancerData {
+class DistributedLoadBalancerData(instance: InstanceId, monitor: 
Option[ActorRef] = None)(
+  implicit actorSystem: ActorSystem,
+  logging: Logging)
+    extends LoadBalancerData {
 
   implicit val timeout = Timeout(5.seconds)
   implicit val executionContext = actorSystem.dispatcher
-  private val activationsById = TrieMap[ActivationId, ActivationEntry]()
+  private val overflowKey = "overflow"
+  private val activationsById = mutable.Map[ActivationId, ActivationEntry]()
+
+  private val localData = new LocalLoadBalancerData()
+  private var sharedDataInvokers = Map[String, Map[Int, Int]]()
+  private var sharedDataNamespaces = Map[String, Map[Int, Int]]()
+  private var sharedDataOverflow = Map[String, Map[Int, Int]]()
+
+  private val updateMonitor = actorSystem.actorOf(Props(new Actor {
+    override def receive = {
+      case Updated(storageName, entries) =>
+        monitor.foreach(_ ! Updated(storageName, entries))
+        storageName match {
+          case "Invokers"   => sharedDataInvokers = entries
+          case "Namespaces" => sharedDataNamespaces = entries
+          case "Overflow"   => sharedDataOverflow = entries
+        }
+    }
+  }))
 
   private val sharedStateInvokers = actorSystem.actorOf(
-    SharedDataService.props("Invokers"),
+    SharedDataService.props("Invokers", updateMonitor),
     name =
       "SharedDataServiceInvokers" + UUID())
   private val sharedStateNamespaces = actorSystem.actorOf(
-    SharedDataService.props("Namespaces"),
+    SharedDataService.props("Namespaces", updateMonitor),
     name =
       "SharedDataServiceNamespaces" + UUID())
-
-  def totalActivationCount =
-    (sharedStateInvokers ? GetMap).mapTo[Map[String, 
BigInt]].map(_.values.sum.toInt)
-
-  def activationCountOn(namespace: UUID): Future[Int] = {
-    (sharedStateNamespaces ? GetMap)
-      .mapTo[Map[String, BigInt]]
-      .map(_.mapValues(_.toInt).getOrElse(namespace.toString, 0))
+  private val sharedStateOverflow = actorSystem.actorOf(
+    SharedDataService.props("Overflow", updateMonitor),
+    name =
+      "SharedDataServiceOverflow" + UUID())
+  def totalActivationCount = {
+    val shared = sharedDataInvokers.values.flatten.filter(_._1 != 
instance.toInt).map(_._2).sum
+    shared + localData.totalActivationCount
+  }
+  def activationCountOn(namespace: UUID): Int = {
+    val shared = sharedDataNamespaces.getOrElse(namespace.toString, 
Map()).filter(_._1 != instance.toInt).values.sum
+    shared + localData.activationCountOn(namespace)
   }
 
-  def activationCountPerInvoker: Future[Map[String, Int]] = {
-    (sharedStateInvokers ? GetMap).mapTo[Map[String, 
BigInt]].map(_.mapValues(_.toInt))
+  def activationCountPerInvoker: Map[String, Int] = {
+    val shared = sharedDataInvokers.mapValues(_.filter(_._1 != 
instance.toInt).values.sum)
+    val local = localData.activationCountPerInvoker
+    local ++ shared.map { case (k, v) => k -> (v + local.getOrElse(k, 0)) }
   }
 
   def activationById(activationId: ActivationId): Option[ActivationEntry] = {
-    activationsById.get(activationId)
+    localData.activationById(activationId)
+    //NOTE: activations are NOT replicated, only the counters
   }
 
-  def putActivation(id: ActivationId, update: => ActivationEntry): 
ActivationEntry = {
+  def putActivation(id: ActivationId, update: => ActivationEntry, isOverflow: 
Boolean = false): ActivationEntry = {
     activationsById.getOrElseUpdate(id, {
       val entry = update
-      sharedStateNamespaces ! IncreaseCounter(entry.namespaceId.asString, 1)
-      sharedStateInvokers ! IncreaseCounter(entry.invokerName.toString, 1)
+      //update the shared stats
+      //if we are processing an overflow message, do NOT double count against 
namespace
+      if (!isOverflow) {
+        sharedStateNamespaces ! IncreaseCounter(entry.namespaceId.asString, 
instance, 1)
+      }
+      //if the initial processing msg is routed to invoker, increase the 
invoker counter
+      //otherwise increase the overflow counter
+      entry.invokerName match {
+        case Some(i) => sharedStateInvokers ! IncreaseCounter(i.toString, 
instance, 1)
+        case None    => sharedStateOverflow ! IncreaseCounter(overflowKey, 
instance, 1)
+      }
       logging.debug(this, "increased shared counters")
+      //store the activation
+      localData.putActivation(id, entry)
       entry
     })
   }
 
   def removeActivation(entry: ActivationEntry): Option[ActivationEntry] = {
     activationsById.remove(entry.id).map { activationEntry =>
-      sharedStateInvokers ! DecreaseCounter(entry.invokerName.toString, 1)
-      sharedStateNamespaces ! DecreaseCounter(entry.namespaceId.asString, 1)
-      logging.debug(this, "decreased shared counters")
+      if (!activationEntry.isOverflow) {
+        //update the shared stats
+        sharedStateNamespaces ! DecreaseCounter(entry.namespaceId.asString, 
instance, 1)
+      }
+      entry.invokerName match {
+        case Some(i) => sharedStateInvokers ! DecreaseCounter(i.toString, 
instance, 1)
+        case None    => sharedStateOverflow ! DecreaseCounter(overflowKey, 
instance, 1)
+      }
+      logging.debug(this, s"decreased shared counters ")
+      //remove the activation
+      localData.removeActivation(entry)
       activationEntry
     }
   }
@@ -87,4 +135,13 @@ class DistributedLoadBalancerData(implicit actorSystem: 
ActorSystem, logging: Lo
   def removeActivation(aid: ActivationId): Option[ActivationEntry] = {
     activationsById.get(aid).flatMap(removeActivation)
   }
+
+  /**
+   * Get the number of activations waiting at the overflow queue
+   * @return
+   */
+  override def overflowActivationCount = {
+    val shared = sharedDataOverflow.values.flatten.filter(_._1 != 
instance.toInt).map(_._2).sum
+    shared + localData.overflowActivationCount
+  }
 }
diff --git 
a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
 
b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
index 13517de5f3..564f74ca6f 100644
--- 
a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
+++ 
b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
@@ -18,7 +18,6 @@
 package whisk.core.loadBalancer
 
 import java.nio.charset.StandardCharsets
-
 import scala.collection.mutable
 import scala.concurrent.Future
 import scala.concurrent.duration._
@@ -34,19 +33,25 @@ import akka.actor.FSM.SubscribeTransitionCallBack
 import akka.actor.FSM.Transition
 import akka.actor.Props
 import akka.pattern.pipe
+//import akka.pattern.ask
 import akka.util.Timeout
+import scala.concurrent.ExecutionContext
 import whisk.common.AkkaLogging
+import whisk.common.Logging
 import whisk.common.LoggingMarkers
 import whisk.common.RingBuffer
 import whisk.common.TransactionId
 import whisk.core.connector._
+import whisk.core.database.NoDocumentException
 import whisk.core.entitlement.Privilege
 import whisk.core.entity.ActivationId.ActivationIdGenerator
 import whisk.core.entity._
+import whisk.core.entity.types.EntityStore
 
 // Received events
 case object GetStatus
-
+case class SubscribeLoadBalancer(loadBalancerActor: ActorRef)
+case class StatusUpdate(status: IndexedSeq[(InstanceId, InvokerState)])
 case object Tick
 
 // States an Invoker can be in
@@ -87,6 +92,7 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) 
=> ActorRef,
   val instanceToRef = mutable.Map[InstanceId, ActorRef]()
   val refToInstance = mutable.Map[ActorRef, InstanceId]()
   var status = IndexedSeq[(InstanceId, InvokerState)]()
+  var lbActor: Option[ActorRef] = None
 
   def receive = {
     case p: PingMessage =>
@@ -94,7 +100,7 @@ class InvokerPool(childFactory: (ActorRefFactory, 
InstanceId) => ActorRef,
         logging.info(this, s"registered a new invoker: 
invoker${p.instance.toInt}")(TransactionId.invokerHealth)
 
         status = padToIndexed(status, p.instance.toInt + 1, i => 
(InstanceId(i), Offline))
-
+        lbActor.foreach(_ ! StatusUpdate(status))
         val ref = childFactory(context, p.instance)
         ref ! SubscribeTransitionCallBack(self) // register for state change 
events
 
@@ -105,6 +111,10 @@ class InvokerPool(childFactory: (ActorRefFactory, 
InstanceId) => ActorRef,
 
     case GetStatus => sender() ! status
 
+    case SubscribeLoadBalancer(lb) =>
+      lbActor = Some(lb)
+      lb ! StatusUpdate(status)
+
     case msg: InvocationFinishedMessage => {
       // Forward message to invoker, if InvokerActor exists
       instanceToRef.get(msg.invokerInstance).map(_.forward(msg))
@@ -113,12 +123,14 @@ class InvokerPool(childFactory: (ActorRefFactory, 
InstanceId) => ActorRef,
     case CurrentState(invoker, currentState: InvokerState) =>
       refToInstance.get(invoker).foreach { instance =>
         status = status.updated(instance.toInt, (instance, currentState))
+        lbActor.foreach(_ ! StatusUpdate(status))
       }
       logStatus()
 
     case Transition(invoker, oldState: InvokerState, newState: InvokerState) =>
       refToInstance.get(invoker).foreach { instance =>
         status = status.updated(instance.toInt, (instance, newState))
+        lbActor.foreach(_ ! StatusUpdate(status))
       }
       logStatus()
 
@@ -181,6 +193,29 @@ object InvokerPool {
       name = EntityName(s"invokerHealthTestAction${i.toInt}"),
       exec = new CodeExecAsString(manifest, """function main(params) { return 
params; }""", None))
   }
+
+  /**
+   * Creates or updates a health test action by updating the entity store.
+   * This method is intended for use on startup.
+   * @return Future that completes successfully iff the action is added to the 
database
+   */
+  def createTestActionForInvokerHealth(db: EntityStore, action: 
WhiskAction)(implicit logging: Logging,
+                                                                             
ec: ExecutionContext): Future[Unit] = {
+    implicit val tid = TransactionId.loadbalancer
+    WhiskAction
+      .get(db, action.docid)
+      .flatMap { oldAction =>
+        WhiskAction.put(db, action.revision(oldAction.rev))(tid, notifier = 
None)
+      }
+      .recover {
+        case _: NoDocumentException => WhiskAction.put(db, action)(tid, 
notifier = None)
+      }
+      .map(_ => {})
+      .andThen {
+        case Success(_) => logging.info(this, "test action for invoker health 
now exists")
+        case Failure(e) => logging.error(this, s"error creating test action 
for invoker health: $e")
+      }
+  }
 }
 
 /**
diff --git 
a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala 
b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
index 1866d2dcba..0972082d69 100644
--- 
a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
+++ 
b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
@@ -17,18 +17,20 @@
 
 package whisk.core.loadBalancer
 
-import whisk.core.entity.{ActivationId, InstanceId, UUID, WhiskActivation}
-
-import scala.concurrent.{Future, Promise}
+import whisk.core.entity.{ActivationId, UUID, WhiskActivation}
+import scala.concurrent.{Promise}
+import whisk.core.entity.InstanceId
 
 case class ActivationEntry(id: ActivationId,
                            namespaceId: UUID,
-                           invokerName: InstanceId,
-                           promise: Promise[Either[ActivationId, 
WhiskActivation]])
+                           var invokerName: Option[InstanceId],
+                           promise: Promise[Either[ActivationId, 
WhiskActivation]],
+                           originalController: Option[InstanceId] = None,
+                           isOverflow: Boolean = false)
 trait LoadBalancerData {
 
   /** Get the number of activations across all namespaces. */
-  def totalActivationCount: Future[Int]
+  def totalActivationCount: Int
 
   /**
    * Get the number of activations for a specific namespace.
@@ -36,14 +38,20 @@ trait LoadBalancerData {
    * @param namespace The namespace to get the activation count for
    * @return a map (namespace -> number of activations in the system)
    */
-  def activationCountOn(namespace: UUID): Future[Int]
+  def activationCountOn(namespace: UUID): Int
 
   /**
    * Get the number of activations for each invoker.
    *
    * @return a map (invoker -> number of activations queued for the invoker)
    */
-  def activationCountPerInvoker: Future[Map[String, Int]]
+  def activationCountPerInvoker: Map[String, Int]
+
+  /**
+   * Get the number of activations waiting at the overflow queue
+   * @return
+   */
+  def overflowActivationCount: Int
 
   /**
    * Get an activation entry for a given activation id.
@@ -60,10 +68,11 @@ trait LoadBalancerData {
    * @param update block calculating the entry to add.
    *               Note: This is evaluated iff the entry
    *               didn't exist before.
+   * @param isOverflow true if this activation should count against user rates 
(otherwise only counts for invoker stats)
    * @return the entry calculated by the block or iff it did
    *         exist before the entry from the state
    */
-  def putActivation(id: ActivationId, update: => ActivationEntry): 
ActivationEntry
+  def putActivation(id: ActivationId, update: => ActivationEntry, isOverflow: 
Boolean = false): ActivationEntry
 
   /**
    * Removes the given entry.
diff --git 
a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
 
b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index 0b5a06d385..55d0dc1da2 100644
--- 
a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ 
b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -17,11 +17,11 @@
 
 package whisk.core.loadBalancer
 
+import akka.actor.Actor
+import akka.actor.ActorRef
 import java.nio.charset.StandardCharsets
-
 import scala.annotation.tailrec
 import scala.concurrent.Await
-import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.Promise
 import scala.concurrent.duration.DurationInt
@@ -30,24 +30,29 @@ import scala.util.Success
 import org.apache.kafka.clients.producer.RecordMetadata
 import akka.actor.ActorRefFactory
 import akka.actor.ActorSystem
+import akka.actor.Cancellable
 import akka.actor.Props
-import akka.cluster.Cluster
 import akka.util.Timeout
 import akka.pattern.ask
+import java.util.concurrent.atomic.AtomicBoolean
+import scala.collection.mutable
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration.FiniteDuration
 import whisk.common.Logging
 import whisk.common.LoggingMarkers
 import whisk.common.TransactionId
 import whisk.core.WhiskConfig
 import whisk.core.WhiskConfig._
+import whisk.core.connector.MessageConsumer
 import whisk.core.connector.{ActivationMessage, CompletionMessage}
 import whisk.core.connector.MessageFeed
 import whisk.core.connector.MessageProducer
 import whisk.core.connector.MessagingProvider
+import whisk.core.connector.OverflowMessage
 import whisk.core.database.NoDocumentException
 import whisk.core.entity.{ActivationId, WhiskActivation}
 import whisk.core.entity.EntityName
 import whisk.core.entity.ExecutableWhiskAction
-import whisk.core.entity.Identity
 import whisk.core.entity.InstanceId
 import whisk.core.entity.UUID
 import whisk.core.entity.WhiskAction
@@ -56,8 +61,6 @@ import whisk.spi.SpiLoader
 
 trait LoadBalancer {
 
-  val activeAckTimeoutGrace = 1.minute
-
   /** Gets the number of in-flight activations for a specific user. */
   def activeActivationsFor(namespace: UUID): Future[Int]
 
@@ -80,62 +83,158 @@ trait LoadBalancer {
     implicit transid: TransactionId): Future[Future[Either[ActivationId, 
WhiskActivation]]]
 
 }
+case class Publish(action: ExecutableWhiskAction, msg: ActivationMessage)
 
-class LoadBalancerService(config: WhiskConfig, instance: InstanceId, 
entityStore: EntityStore)(
-  implicit val actorSystem: ActorSystem,
-  logging: Logging)
+class LoadBalancerActorService(
+  config: WhiskConfig,
+  instance: InstanceId,
+  invokerPool: ActorRef,
+  loadBalancerData: LoadBalancerData)(implicit val actorSystem: ActorSystem, 
logging: Logging)
     extends LoadBalancer {
 
   /** The execution context for futures */
   implicit val executionContext: ExecutionContext = actorSystem.dispatcher
 
+  /** Gets a producer which can publish messages to the kafka bus. */
+  private val messagingProvider = SpiLoader.get[MessagingProvider]
+  private val messageProducer = messagingProvider.getProducer(config, 
executionContext)
+
+  /**
+   * Subscribes to active acks (completion messages from the invokers), and
+   * registers a handler for received active acks from invokers.
+   */
+  val maxActiveAcksPerPoll = 128
+  val activeAckConsumer =
+    messagingProvider.getConsumer(config, "completions", 
s"completed${instance.toInt}", maxPeek = maxActiveAcksPerPoll)
+
+  val maxOverflowMsgPerPoll = config.loadbalancerInvokerBusyThreshold //TODO: 
only pull enough messages that can be processed immediately
+  val overflowConsumer =
+    messagingProvider.getConsumer(config, "overflow", s"overflow", maxPeek = 
maxOverflowMsgPerPoll)
+
+  /** setup the LoadBalancerActor */
+  val lbActor = actorSystem.actorOf(
+    Props(new LoadBalancerActor(config, instance, invokerPool, 
activeAckConsumer, overflowConsumer, loadBalancerData)))
+  invokerPool ! SubscribeLoadBalancer(lbActor)
+
+  implicit val timeout = Timeout(30.seconds)
+
+  /** Gets the number of in-flight activations for a specific user. */
+  override def activeActivationsFor(namespace: UUID) = Future.successful(0)
+
+  /** Gets the number of in-flight activations in the system. */
+  override def totalActiveActivations = Future.successful(0)
+
+  /**
+   * Publishes activation message on internal bus for an invoker to pick up.
+   *
+   * @param action the action to invoke
+   * @param msg the activation message to publish on an invoker topic
+   * @param transid the transaction id for the request
+   * @return result a nested Future the outer indicating completion of 
publishing and
+   *         the inner the completion of the action (i.e., the result)
+   *         if it is ready before timeout (Right) otherwise the activation id 
(Left).
+   *         The future is guaranteed to complete within the declared action 
time limit
+   *         plus a grace period (see activeAckTimeoutGrace).
+   */
+  override def publish(action: ExecutableWhiskAction, msg: 
ActivationMessage)(implicit transid: TransactionId) = {
+    val res = lbActor.ask(Publish(action, 
msg)).mapTo[Future[Either[ActivationId, WhiskActivation]]]
+    res //Future.successful(res)
+  }
+
+  def allInvokers: Future[IndexedSeq[(InstanceId, InvokerState)]] =
+    invokerPool.ask(GetStatus).mapTo[IndexedSeq[(InstanceId, InvokerState)]]
+}
+
+class LoadBalancerActor(config: WhiskConfig,
+                        instance: InstanceId,
+                        invokerPool: ActorRef,
+                        activeAckConsumer: MessageConsumer,
+                        overflowConsumer: MessageConsumer,
+                        val loadBalancerData: LoadBalancerData)(implicit 
logging: Logging)
+    extends Actor {
+  override def postStop() {
+    activeAckConsumer.close()
+    overflowConsumer.close()
+  }
+  implicit val actorSystem = context.system
+  implicit val ec = context.dispatcher
+  val activeAckTimeoutGrace = 1.minute
+
+  var allInvokersLocal = IndexedSeq[(InstanceId, InvokerState)]()
+  val countersLocal = mutable.Map[InstanceId, Int]()
+  var localOverflowActivationCount: Int = 0
+  val overflowState = new AtomicBoolean(false)
+
   /** How many invokers are dedicated to blackbox images.  We range bound to 
something sensical regardless of configuration. */
   private val blackboxFraction: Double = Math.max(0.0, Math.min(1.0, 
config.controllerBlackboxFraction))
   logging.info(this, s"blackboxFraction = 
$blackboxFraction")(TransactionId.loadbalancer)
 
-  /** Feature switch for shared load balancer data **/
-  private val loadBalancerData = {
-    if (config.controllerLocalBookkeeping) {
-      new LocalLoadBalancerData()
-    } else {
+  override def receive = {
+    case Publish(action, msg) =>
+      sender() ! publish(action, msg)(msg.transid)
+    case StatusUpdate(invokers) =>
+      allInvokersLocal = invokers
+  }
 
-      /** Specify how seed nodes are generated */
-      val seedNodesProvider = new 
StaticSeedNodesProvider(config.controllerSeedNodes, actorSystem.name)
-      Cluster(actorSystem).joinSeedNodes(seedNodesProvider.getSeedNodes())
-      new DistributedLoadBalancerData()
+  def publish(action: ExecutableWhiskAction, msg: ActivationMessage)(
+    implicit transid: TransactionId): Future[Either[ActivationId, 
WhiskActivation]] = {
+    val hash = generateHash(msg.user.namespace, action)
+    if (!overflowState.get()) {
+      sendToInvokerOrOverflow(msg, action, hash, action.exec.pull)
+    } else {
+      sendActivationToOverflow(
+        messageProducer,
+        OverflowMessage(transid, msg, 
action.limits.timeout.duration.toSeconds.toInt, hash, action.exec.pull, 
instance))
+        .flatMap { _ =>
+          val entry = setupActivation(action.limits.timeout.duration, 
msg.activationId, msg.user.uuid, None, transid)
+          entry.promise.future
+        }
     }
+
   }
 
-  override def activeActivationsFor(namespace: UUID) = 
loadBalancerData.activationCountOn(namespace)
+  /** Generates a hash based on the string representation of namespace and 
action */
+  private def generateHash(namespace: EntityName, action: 
ExecutableWhiskAction): Int = {
+    (namespace.asString.hashCode() ^ 
action.fullyQualifiedName(false).asString.hashCode()).abs
+  }
 
-  override def totalActiveActivations = loadBalancerData.totalActivationCount
+  private def sendToInvokerOrOverflow(msg: ActivationMessage, action: 
ExecutableWhiskAction, hash: Int, pull: Boolean)(
+    implicit transid: TransactionId): Future[Either[ActivationId, 
WhiskActivation]] = {
+    val invMatched = chooseInvoker(hash, pull, false)
+    val entry = setupActivation(action.limits.timeout.duration, 
msg.activationId, msg.user.uuid, invMatched, transid)
 
-  override def publish(action: ExecutableWhiskAction, msg: ActivationMessage)(
-    implicit transid: TransactionId): Future[Future[Either[ActivationId, 
WhiskActivation]]] = {
-    chooseInvoker(msg.user, action).flatMap { invokerName =>
-      val entry = setupActivation(action, msg.activationId, msg.user.uuid, 
invokerName, transid)
-      sendActivationToInvoker(messageProducer, msg, invokerName).map { _ =>
-        entry.promise.future
-      }
+    invMatched match {
+      case Some(i) =>
+        LoadBalancerService.sendActivationToInvoker(messageProducer, msg, 
i).flatMap { _ =>
+          entry.promise.future
+        }
+      case None =>
+        if (overflowState.compareAndSet(false, true)) {
+          logging.info(this, "entering overflow state; no invokers have 
capacity")
+        }
+
+        sendActivationToOverflow(
+          messageProducer,
+          OverflowMessage(transid, msg, 
action.limits.timeout.duration.toSeconds.toInt, hash, pull, instance)).flatMap {
+          _ =>
+            entry.promise.future
+        }
     }
   }
 
-  /** An indexed sequence of all invokers in the current system */
-  def allInvokers: Future[IndexedSeq[(InstanceId, InvokerState)]] =
-    invokerPool
-      .ask(GetStatus)(Timeout(5.seconds))
-      .mapTo[IndexedSeq[(InstanceId, InvokerState)]]
-
   /**
    * Tries to fill in the result slot (i.e., complete the promise) when a 
completion message arrives.
    * The promise is removed form the map when the result arrives or upon 
timeout.
    *
-   * @param msg is the kafka message payload as Json
+   * @param response the ActivationId OR the WhiskActivation response
+   * @param tid transaction id
+   * @param forced true if this activation was expired before the active ack
+   * @param invoker Some(InstanceId) if it was sent to invoker, or None if it 
was overflow (expired)
    */
   private def processCompletion(response: Either[ActivationId, 
WhiskActivation],
                                 tid: TransactionId,
                                 forced: Boolean,
-                                invoker: InstanceId): Unit = {
+                                invoker: Option[InstanceId]): Unit = {
     val aid = response.fold(l => l, r => r.activationId)
 
     // treat left as success (as it is the result of a message exceeding the 
bus limit)
@@ -143,13 +242,30 @@ class LoadBalancerService(config: WhiskConfig, instance: 
InstanceId, entityStore
 
     loadBalancerData.removeActivation(aid) match {
       case Some(entry) =>
+        //cancel the scheduled timeout handler
+        timeouts.remove(aid).foreach(_.cancel())
         logging.info(this, s"${if (!forced) "received" else "forced"} active 
ack for '$aid'")(tid)
         // Active acks that are received here are strictly from user actions - 
health actions are not part of
         // the load balancer's activation map. Inform the invoker pool 
supervisor of the user action completion.
         // If the active ack was forced, because the waiting period expired, 
treat it as a failed activation.
         // A cluster of such failures will eventually turn the invoker 
unhealthy and suspend queuing activations
         // to that invoker topic.
-        invokerPool ! InvocationFinishedMessage(invoker, isSuccess && !forced)
+        entry.invokerName.foreach(invokerInstance => {
+          invokerPool ! InvocationFinishedMessage(invokerInstance, isSuccess 
&& !forced)
+          //if processing overflow that initiated elsewhere, propagate the 
completion
+          entry.originalController.foreach(controllerInstance => {
+            val msg = CompletionMessage(tid, response, invokerInstance)
+            messageProducer.send(s"completed${controllerInstance.toInt}", msg)
+          })
+        })
+
+        //if this is an entry for processing overflow, adjust overflow state 
if needed
+        if (entry.isOverflow) {
+          localOverflowActivationCount -= 1
+          if (overflowState.get() && localOverflowActivationCount == 0 && 
overflowState.compareAndSet(true, false)) {
+            logging.info(this, "removing overflow state after processing 
outstanding overflow messages")
+          }
+        }
         if (!forced) {
           entry.promise.trySuccess(response)
         } else {
@@ -161,7 +277,7 @@ class LoadBalancerService(config: WhiskConfig, instance: 
InstanceId, entityStore
         // for activations that already timed out.
         // For both cases, it looks like the invoker works again and we should 
send the status of
         // the activation to the invokerPool.
-        invokerPool ! InvocationFinishedMessage(invoker, isSuccess)
+        invoker.foreach(invokerPool ! InvocationFinishedMessage(_, isSuccess))
         logging.debug(this, s"received active ack for '$aid' which has no 
entry")(tid)
       case None =>
         // the entry has already been removed by an active ack. This part of 
the code is reached by the timeout.
@@ -170,26 +286,38 @@ class LoadBalancerService(config: WhiskConfig, instance: 
InstanceId, entityStore
     }
   }
 
+  val timeouts = mutable.Map[ActivationId, Cancellable]()
+
   /**
    * Creates an activation entry and insert into various maps.
    */
-  private def setupActivation(action: ExecutableWhiskAction,
+  private def setupActivation(actionTimeout: FiniteDuration,
                               activationId: ActivationId,
                               namespaceId: UUID,
-                              invokerName: InstanceId,
-                              transid: TransactionId): ActivationEntry = {
-    val timeout = action.limits.timeout.duration + activeAckTimeoutGrace
+                              invokerName: Option[InstanceId],
+                              transid: TransactionId,
+                              originalController: Option[InstanceId] = None,
+                              isOverflow: Boolean = false): ActivationEntry = {
+    val timeout = actionTimeout + activeAckTimeoutGrace
     // Install a timeout handler for the catastrophic case where an active ack 
is not received at all
     // (because say an invoker is down completely, or the connection to the 
message bus is disrupted) or when
     // the active ack is significantly delayed (possibly dues to long queues 
but the subject should not be penalized);
     // in this case, if the activation handler is still registered, remove it 
and update the books.
-    loadBalancerData.putActivation(activationId, {
-      actorSystem.scheduler.scheduleOnce(timeout) {
-        processCompletion(Left(activationId), transid, forced = true, invoker 
= invokerName)
-      }
-
-      ActivationEntry(activationId, namespaceId, invokerName, 
Promise[Either[ActivationId, WhiskActivation]]())
-    })
+    loadBalancerData.putActivation(
+      activationId, {
+        timeouts.put(activationId, actorSystem.scheduler.scheduleOnce(timeout) 
{
+          processCompletion(Left(activationId), transid, forced = true, 
invokerName)
+        })
+
+        ActivationEntry(
+          activationId,
+          namespaceId,
+          invokerName,
+          Promise[Either[ActivationId, WhiskActivation]],
+          originalController,
+          isOverflow)
+      },
+      isOverflow)
   }
 
   /**
@@ -215,50 +343,25 @@ class LoadBalancerService(config: WhiskConfig, instance: 
InstanceId, entityStore
   }
 
   /** Gets a producer which can publish messages to the kafka bus. */
-  private val messagingProvider = SpiLoader.get[MessagingProvider]
-  private val messageProducer = messagingProvider.getProducer(config, 
executionContext)
+  val messagingProvider = SpiLoader.get[MessagingProvider]
+  private val messageProducer = messagingProvider.getProducer(config, 
context.dispatcher)
 
-  private def sendActivationToInvoker(producer: MessageProducer,
-                                      msg: ActivationMessage,
-                                      invoker: InstanceId): 
Future[RecordMetadata] = {
+  private def sendActivationToOverflow(producer: MessageProducer, msg: 
OverflowMessage): Future[RecordMetadata] = {
     implicit val transid = msg.transid
 
-    val topic = s"invoker${invoker.toInt}"
+    val topic = "overflow"
     val start = transid.started(
       this,
       LoggingMarkers.CONTROLLER_KAFKA,
-      s"posting topic '$topic' with activation id '${msg.activationId}'")
+      s"posting overflow topic '$topic' with activation id 
'${msg.msg.activationId}'")
 
     producer.send(topic, msg).andThen {
       case Success(status) =>
+        localOverflowActivationCount += 1
         transid.finished(this, start, s"posted to 
${status.topic()}[${status.partition()}][${status.offset()}]")
       case Failure(e) => transid.failed(this, start, s"error on posting to 
topic $topic")
     }
   }
-  private val invokerPool = {
-    // Do not create the invokerPool if it is not possible to create the 
health test action to recover the invokers.
-    InvokerPool
-      .healthAction(instance)
-      .map {
-        // Await the creation of the test action; on failure, this will abort 
the constructor which should
-        // in turn abort the startup of the controller.
-        a =>
-          Await.result(createTestActionForInvokerHealth(entityStore, a), 
1.minute)
-      }
-      .orElse {
-        throw new IllegalStateException(
-          "cannot create test action for invoker health because runtime 
manifest is not valid")
-      }
-
-    val maxPingsPerPoll = 128
-    val pingConsumer =
-      messagingProvider.getConsumer(config, s"health${instance.toInt}", 
"health", maxPeek = maxPingsPerPoll)
-    val invokerFactory = (f: ActorRefFactory, invokerInstance: InstanceId) =>
-      f.actorOf(InvokerActor.props(invokerInstance, instance))
-
-    actorSystem.actorOf(
-      InvokerPool.props(invokerFactory, (m, i) => 
sendActivationToInvoker(messageProducer, m, i), pingConsumer))
-  }
 
   /**
    * Subscribes to active acks (completion messages from the invokers), and
@@ -266,8 +369,6 @@ class LoadBalancerService(config: WhiskConfig, instance: 
InstanceId, entityStore
    */
   val maxActiveAcksPerPoll = 128
   val activeAckPollDuration = 1.second
-  private val activeAckConsumer =
-    messagingProvider.getConsumer(config, "completions", 
s"completed${instance.toInt}", maxPeek = maxActiveAcksPerPoll)
   val activationFeed = actorSystem.actorOf(Props {
     new MessageFeed(
       "activeack",
@@ -282,7 +383,7 @@ class LoadBalancerService(config: WhiskConfig, instance: 
InstanceId, entityStore
     val raw = new String(bytes, StandardCharsets.UTF_8)
     CompletionMessage.parse(raw) match {
       case Success(m: CompletionMessage) =>
-        processCompletion(m.response, m.transid, forced = false, invoker = 
m.invoker)
+        processCompletion(m.response, m.transid, forced = false, invoker = 
Some(m.invoker))
         activationFeed ! MessageFeed.Processed
 
       case Failure(t) =>
@@ -290,6 +391,80 @@ class LoadBalancerService(config: WhiskConfig, instance: 
InstanceId, entityStore
         logging.error(this, s"failed processing message: $raw with $t")
     }
   }
+  // //TODO: only pull enough messages that can be processed immediately
+  val overflowPollDuration = 200.milliseconds
+
+  val offsetMonitor = actorSystem.actorOf(Props {
+    new Actor {
+      override def receive = {
+        case MessageFeed.MaxOffset =>
+          if (overflowState.compareAndSet(true, false)) {
+            logging.info(this, "resetting overflow state via offsetMonitor for 
overflow topic")
+          }
+      }
+    }
+  })
+
+  //ideally the overflow capacity should be dynamic, based on free invokers, 
to provide some backpressure. For now, capacity of 1
+  //(or some small number less than number of invokers) may be ok.
+  val overflowHandlerCapacity = overflowConsumer.maxPeek
+  val overflowFeed = actorSystem.actorOf(Props {
+    new MessageFeed(
+      "overflow",
+      logging,
+      overflowConsumer,
+      overflowHandlerCapacity,
+      overflowPollDuration,
+      processOverflow,
+      offsetMonitor = Some(offsetMonitor))
+  })
+
+  private def processOverflow(bytes: Array[Byte]): Future[Unit] = Future {
+    val raw = new String(bytes, StandardCharsets.UTF_8)
+    OverflowMessage.parse(raw) match {
+      case Success(m: OverflowMessage) =>
+        implicit val tid = m.msg.transid
+        logging.info(this, s"processing overflow msg for activation 
${m.msg.activationId}")
+        //remove from entries (will replace with an overflow entry if it 
exists locally)
+        val entryOption = loadBalancerData
+          .removeActivation(m.msg.activationId)
+
+        //process the activation request: update the invoker ref, and send to 
invoker
+        chooseInvoker(m.hash, m.pull, true) match {
+          case Some(instanceId) =>
+            //Update the invoker name for the overflow ActivationEntry
+            //The timeout for the activationId will still be effective.
+            entryOption match {
+              case Some(entry) =>
+                entry.invokerName = Some(instanceId)
+                loadBalancerData.putActivation(m.msg.activationId, entry, 
false)
+                LoadBalancerService.sendActivationToInvoker(messageProducer, 
m.msg, instanceId)
+              case None =>
+                //TODO: adjust the timeout for time spent in overflow topic!
+                val entry = setupActivation(
+                  m.actionTimeoutSeconds.seconds,
+                  m.msg.activationId,
+                  m.msg.user.uuid,
+                  Some(instanceId),
+                  m.msg.transid,
+                  Some(m.originalController),
+                  true)
+                loadBalancerData.putActivation(m.msg.activationId, entry, true)
+                val updatedMsg = m.msg.copy(rootControllerIndex = 
this.instance)
+                LoadBalancerService.sendActivationToInvoker(messageProducer, 
updatedMsg, instanceId)
+            }
+
+          case None =>
+            //if no invokers available, all activations will go to overflow 
queue till capacity is available again
+            logging.error(this, "invalid overflow processing; no invokers have 
capacity")
+          //TODO: should requeue to overflow?
+        }
+        overflowFeed ! MessageFeed.Processed
+
+      case Failure(t) =>
+        logging.error(this, s"failed processing overflow message: $raw with 
$t")
+    }
+  }
 
   /** Compute the number of blackbox-dedicated invokers by applying a rounded 
down fraction of all invokers (but at least 1). */
   private def numBlackbox(totalInvokers: Int) = Math.max(1, 
(totalInvokers.toDouble * blackboxFraction).toInt)
@@ -312,30 +487,14 @@ class LoadBalancerService(config: WhiskConfig, instance: 
InstanceId, entityStore
   }
 
   /** Determine which invoker this activation should go to. Due to dynamic 
conditions, it may return no invoker. */
-  private def chooseInvoker(user: Identity, action: ExecutableWhiskAction): 
Future[InstanceId] = {
-    val hash = generateHash(user.namespace, action)
-
-    loadBalancerData.activationCountPerInvoker.flatMap { currentActivations =>
-      allInvokers.flatMap { invokers =>
-        val invokersToUse = if (action.exec.pull) blackboxInvokers(invokers) 
else managedInvokers(invokers)
-        val invokersWithUsage = invokersToUse.view.map {
-          // Using a view defers the comparably expensive lookup to actual 
access of the element
-          case (instance, state) => (instance, state, 
currentActivations.getOrElse(instance.toString, 0))
-        }
-
-        LoadBalancerService.schedule(invokersWithUsage, 
config.loadbalancerInvokerBusyThreshold, hash) match {
-          case Some(invoker) => Future.successful(invoker)
-          case None =>
-            logging.error(this, s"all invokers 
down")(TransactionId.invokerHealth)
-            Future.failed(new LoadBalancerException("no invokers available"))
-        }
-      }
+  protected def chooseInvoker(hash: Int, pull: Boolean, overflow: Boolean): 
Option[InstanceId] = {
+    val invokersToUse = if (pull) blackboxInvokers(allInvokersLocal) else 
managedInvokers(allInvokersLocal)
+    val currentActivations = loadBalancerData.activationCountPerInvoker
+    val invokersWithUsage = invokersToUse.view.map {
+      // Using a view defers the comparably expensive lookup to actual access 
of the element
+      case (instance, state) => (instance, state, 
currentActivations.getOrElse(instance.toString, 0))
     }
-  }
-
-  /** Generates a hash based on the string representation of namespace and 
action */
-  private def generateHash(namespace: EntityName, action: 
ExecutableWhiskAction): Int = {
-    (namespace.asString.hashCode() ^ 
action.fullyQualifiedName(false).asString.hashCode()).abs
+    LoadBalancerService.schedule(invokersWithUsage, 
config.loadbalancerInvokerBusyThreshold, hash, overflow)
   }
 }
 
@@ -368,19 +527,18 @@ object LoadBalancerService {
 
   /**
    * Scans through all invokers and searches for an invoker, that has a queue 
length
-   * below the defined threshold. The threshold is subject to a 3 times back 
off. Iff
-   * no "underloaded" invoker was found it will default to the first invoker 
in the
-   * step-defined progression that is healthy.
+   * below the defined threshold. Iff no "underloaded" invoker was found, 
return None.
    *
    * @param invokers a list of available invokers to search in, including 
their state and usage
    * @param invokerBusyThreshold defines when an invoker is considered 
overloaded
    * @param hash stable identifier of the entity to be scheduled
+   * @param overflow scheduling during overflow processing (true) *must* find 
an invoker to use
    * @return an invoker to schedule to or None of no invoker is available
    */
-  def schedule(invokers: Seq[(InstanceId, InvokerState, Int)],
-               invokerBusyThreshold: Int,
-               hash: Int): Option[InstanceId] = {
+  def schedule(invokers: Seq[(InstanceId, InvokerState, Int)], 
invokerBusyThreshold: Int, hash: Int, overflow: Boolean)(
+    implicit logging: Logging): Option[InstanceId] = {
 
+    require(invokerBusyThreshold > 0, "invokerBusyThreshold should be > 0")
     val numInvokers = invokers.size
     if (numInvokers > 0) {
       val homeInvoker = hash % numInvokers
@@ -394,13 +552,75 @@ object LoadBalancerService {
         .map(invokers)
         .filter(_._2 == Healthy)
 
-      invokerProgression
-        .find(_._3 < invokerBusyThreshold)
-        .orElse(invokerProgression.find(_._3 < invokerBusyThreshold * 2))
-        .orElse(invokerProgression.find(_._3 < invokerBusyThreshold * 3))
-        .orElse(invokerProgression.headOption)
-        .map(_._1)
-    } else None
+      if (overflow) {
+        //should not arrive here without an invoker who is not busy! but just 
in case, use the step progression with incrementing busy-ness
+        invokerProgression
+          .find(_._3 < invokerBusyThreshold)
+          .orElse({
+            logging.warn(this, "scheduling to a busy invoker during overflow 
processing")
+            invokerProgression.find(_._3 < invokerBusyThreshold * 2)
+          })
+          .orElse(invokerProgression.find(_._3 < invokerBusyThreshold * 3))
+          .orElse(invokerProgression.headOption)
+          .map(_._1)
+      } else {
+        invokerProgression
+          .find(_._3 < invokerBusyThreshold)
+          //don't consider invokers that have reached capacity when not in 
overflow state
+          .map(_._1)
+      }
+    } else {
+      logging.warn(this, "no invokers available")
+      None
+    }
+  }
+  def sendActivationToInvoker(producer: MessageProducer, msg: 
ActivationMessage, invoker: InstanceId)(
+    implicit logging: Logging,
+    ec: ExecutionContext): Future[RecordMetadata] = {
+    implicit val transid = msg.transid
+
+    val topic = s"invoker${invoker.toInt}"
+    val start = transid.started(
+      this,
+      LoggingMarkers.CONTROLLER_KAFKA,
+      s"posting topic '$topic' with activation id '${msg.activationId}'")
+    producer.send(topic, msg).andThen {
+      case Success(status) =>
+        transid.finished(this, start, s"posted to 
${status.topic()}[${status.partition()}][${status.offset()}]")
+      case Failure(e) => transid.failed(this, start, s"error on posting to 
topic $topic")
+    }
+  }
+
+  def createInvokerPool(instance: InstanceId,
+                        actorSystem: ActorSystem,
+                        executionContext: ExecutionContext,
+                        entityStore: EntityStore,
+                        messageProducer: MessageProducer,
+                        healthConsumer: MessageConsumer)(implicit logging: 
Logging): ActorRef = {
+    implicit val ec: ExecutionContext = executionContext
+    // Do not create the invokerPool if it is not possible to create the 
health test action to recover the invokers.
+    InvokerPool
+      .healthAction(instance)
+      .map {
+        // Await the creation of the test action; on failure, this will abort 
the constructor which should
+        // in turn abort the startup of the controller.
+        a =>
+          
Await.result(InvokerPool.createTestActionForInvokerHealth(entityStore, a), 
1.minute)
+      }
+      .orElse {
+        throw new IllegalStateException(
+          "cannot create test action for invoker health because runtime 
manifest is not valid")
+      }
+
+    val invokerFactory =
+      (f: ActorRefFactory, invokerInstance: InstanceId) => 
f.actorOf(InvokerActor.props(invokerInstance, instance))
+
+    actorSystem.actorOf(
+      InvokerPool
+        .props(
+          invokerFactory,
+          (m, i) => 
LoadBalancerService.sendActivationToInvoker(messageProducer, m, i),
+          healthConsumer))
   }
 
 }
diff --git 
a/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala
 
b/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala
index 92e3789e76..dee21d081b 100644
--- 
a/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala
+++ 
b/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala
@@ -20,7 +20,6 @@ package whisk.core.loadBalancer
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.concurrent.TrieMap
-import scala.concurrent.Future
 import whisk.core.entity.{ActivationId, UUID}
 
 /**
@@ -36,27 +35,36 @@ class LocalLoadBalancerData() extends LoadBalancerData {
   private val activationByNamespaceId = TrieMap[UUID, AtomicInteger]()
   private val activationsById = TrieMap[ActivationId, ActivationEntry]()
   private val totalActivations = new AtomicInteger(0)
+  private val overflowActivations = new AtomicInteger(0)
 
-  override def totalActivationCount: Future[Int] = 
Future.successful(totalActivations.get)
+  override def totalActivationCount: Int = totalActivations.get
 
-  override def activationCountOn(namespace: UUID): Future[Int] = {
-    
Future.successful(activationByNamespaceId.get(namespace).map(_.get).getOrElse(0))
+  override def activationCountOn(namespace: UUID): Int = {
+    activationByNamespaceId.get(namespace).map(_.get).getOrElse(0)
   }
 
-  override def activationCountPerInvoker: Future[Map[String, Int]] = {
-    Future.successful(activationByInvoker.toMap.mapValues(_.get))
+  override def activationCountPerInvoker: Map[String, Int] = {
+    activationByInvoker.toMap.mapValues(_.get)
   }
 
+  override def overflowActivationCount: Int = overflowActivations.get
+
   override def activationById(activationId: ActivationId): 
Option[ActivationEntry] = {
     activationsById.get(activationId)
   }
 
-  override def putActivation(id: ActivationId, update: => ActivationEntry): 
ActivationEntry = {
+  override def putActivation(id: ActivationId,
+                             update: => ActivationEntry,
+                             isOverflow: Boolean = false): ActivationEntry = {
     activationsById.getOrElseUpdate(id, {
       val entry = update
       totalActivations.incrementAndGet()
       activationByNamespaceId.getOrElseUpdate(entry.namespaceId, new 
AtomicInteger(0)).incrementAndGet()
-      activationByInvoker.getOrElseUpdate(entry.invokerName.toString, new 
AtomicInteger(0)).incrementAndGet()
+      entry.invokerName match {
+        case Some(i) => activationByInvoker.getOrElseUpdate(i.toString, new 
AtomicInteger(0)).incrementAndGet()
+        case None    => overflowActivations.incrementAndGet()
+      }
+
       entry
     })
   }
@@ -65,7 +73,10 @@ class LocalLoadBalancerData() extends LoadBalancerData {
     activationsById.remove(entry.id).map { x =>
       totalActivations.decrementAndGet()
       activationByNamespaceId.getOrElseUpdate(entry.namespaceId, new 
AtomicInteger(0)).decrementAndGet()
-      activationByInvoker.getOrElseUpdate(entry.invokerName.toString, new 
AtomicInteger(0)).decrementAndGet()
+      entry.invokerName match {
+        case Some(i) => activationByInvoker.getOrElseUpdate(i.toString, new 
AtomicInteger(0)).decrementAndGet()
+        case None    => overflowActivations.decrementAndGet()
+      }
       x
     }
   }
diff --git 
a/core/controller/src/main/scala/whisk/core/loadBalancer/SharedDataService.scala
 
b/core/controller/src/main/scala/whisk/core/loadBalancer/SharedDataService.scala
index d0595d3ece..3b53fee9e3 100644
--- 
a/core/controller/src/main/scala/whisk/core/loadBalancer/SharedDataService.scala
+++ 
b/core/controller/src/main/scala/whisk/core/loadBalancer/SharedDataService.scala
@@ -20,31 +20,40 @@ package whisk.core.loadBalancer
 import akka.actor.{Actor, ActorLogging, ActorRef, Props}
 import akka.cluster.Cluster
 import akka.cluster.ClusterEvent._
-import akka.cluster.ddata.{DistributedData, PNCounterMap, PNCounterMapKey}
+//import akka.cluster.ddata.Key
+import akka.cluster.ddata.ORMap
+import akka.cluster.ddata.ORMapKey
+import akka.cluster.ddata.PNCounterMapKey
+import akka.cluster.ddata.{DistributedData, PNCounterMap}
 import akka.cluster.ddata.Replicator._
 import whisk.common.AkkaLogging
+import whisk.core.entity.InstanceId
 
-case class IncreaseCounter(key: String, value: Long)
-case class DecreaseCounter(key: String, value: Long)
+case class IncreaseCounter(key: String, instance: InstanceId, value: Long)
+case class DecreaseCounter(key: String, instance: InstanceId, value: Long)
 case class ReadCounter(key: String)
 case class RemoveCounter(key: String)
+case class Updated(storageName: String, entries: Map[String, Map[Int, Int]])
+
 case object GetMap
 
 /**
  * Companion object to specify actor properties from the outside, e.g. name of 
the shared map and cluster seed nodes
  */
 object SharedDataService {
-  def props(storageName: String): Props =
-    Props(new SharedDataService(storageName))
+  def props(storageName: String, monitor: ActorRef): Props =
+    Props(new SharedDataService(storageName, monitor))
 }
 
-class SharedDataService(storageName: String) extends Actor with ActorLogging {
+class SharedDataService(storageName: String, monitor: ActorRef) extends Actor 
with ActorLogging {
 
   val replicator = DistributedData(context.system).replicator
 
   val logging = new AkkaLogging(context.system.log)
 
-  val storage = PNCounterMapKey[String](storageName)
+  val storage = ORMapKey[String, PNCounterMap[Int]](storageName) // 
PNCounterMapKey[String](storageName)
+
+  def instanceKey(instance: InstanceId) = 
PNCounterMapKey[Int](instance.toString)
 
   implicit val node = Cluster(context.system)
 
@@ -54,9 +63,11 @@ class SharedDataService(storageName: String) extends Actor 
with ActorLogging {
   override def preStart(): Unit = {
     replicator ! Subscribe(storage, self)
     node.subscribe(self, initialStateMode = InitialStateAsEvents, 
classOf[MemberEvent], classOf[UnreachableMember])
-    replicator ! Update(storage, PNCounterMap.empty[String], 
writeLocal)(_.remove(node, "0"))
+    replicator ! Update(storage, ORMap.empty[String, PNCounterMap[Int]], 
writeLocal)(_.remove(node, "0"))
+  }
+  override def postStop(): Unit = {
+    node.unsubscribe(self)
   }
-  override def postStop(): Unit = node.unsubscribe(self)
 
   /**
    * CRUD operations on the counter, process cluster member events for logging
@@ -64,35 +75,41 @@ class SharedDataService(storageName: String) extends Actor 
with ActorLogging {
    */
   def receive = {
 
-    case (IncreaseCounter(key, increment)) =>
-      replicator ! Update(storage, PNCounterMap.empty[String], 
writeLocal)(_.increment(key, increment))
-
-    case (DecreaseCounter(key, decrement)) =>
-      replicator ! Update(storage, PNCounterMap[String], 
writeLocal)(_.decrement(key, decrement))
-
+    case (IncreaseCounter(key, instance, increment)) =>
+      replicator ! Update(storage, ORMap.empty[String, PNCounterMap[Int]], 
writeLocal)(m => {
+        m + (key, m.getOrElse(key, 
PNCounterMap[Int]()).increment(instance.toInt, increment))
+      })
+    case (DecreaseCounter(key, instance, decrement)) =>
+      replicator ! Update(storage, ORMap.empty[String, PNCounterMap[Int]], 
writeLocal)(m => {
+        m + (key, m.getOrElse(key, 
PNCounterMap[Int]()).decrement(instance.toInt, decrement))
+      })
     case GetMap =>
       replicator ! Get(storage, readLocal, request = Some((sender())))
-
     case MemberUp(member) =>
       logging.info(this, "Member is Up: " + member.address)
-
     case MemberRemoved(member, previousStatus) =>
       logging.warn(this, s"Member is Removed: ${member.address} after 
$previousStatus")
-
-    case c @ Changed(_) =>
+    case c @ Changed(e) =>
       logging.debug(this, "Current elements: " + c.get(storage))
+      val res = c.get(storage).entries.mapValues(_.entries.mapValues(_.toInt))
+      if (res.nonEmpty) {
+        res.values.foreach(_.values.foreach(i => {
+          require(i >= 0, s"values cannot be less than 0 ${res}")
+        }))
+        monitor ! Updated(storageName, res)
+      }
 
     case g @ GetSuccess(_, Some((replyTo: ActorRef))) =>
-      val map = g.get(storage).entries
+      val map = g.get(storage).entries.mapValues(_.entries)
       replyTo ! map
 
     case g @ GetSuccess(_, Some((replyTo: ActorRef, key: String))) =>
       if (g.get(storage).contains(key)) {
-        val response = g.get(storage).getValue(key).intValue()
-        replyTo ! response
+        val response = g.get(storage).getOrElse(key, PNCounterMap[Int]())
+        replyTo ! response.entries
       } else
         replyTo ! None
-
-    case _ => // ignore
+    case msg =>
+    // ignore
   }
 }
diff --git a/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala 
b/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
index 9ab2973b78..c71e442c41 100644
--- a/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
+++ b/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
@@ -19,20 +19,25 @@ package whisk.core.connector.test
 
 import java.util.ArrayList
 import java.util.concurrent.LinkedBlockingQueue
-
 import scala.concurrent.Future
 import scala.concurrent.duration.Duration
 import scala.collection.JavaConversions._
-
 import org.apache.kafka.clients.producer.RecordMetadata
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.Record
 import common.StreamLogging
+import scala.collection.mutable
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration._
 
 import whisk.common.Counter
+import whisk.common.Logging
+import whisk.core.WhiskConfig
 import whisk.core.connector.Message
 import whisk.core.connector.MessageConsumer
 import whisk.core.connector.MessageProducer
+import whisk.core.connector.MessagingProvider
+import whisk.core.entity.InstanceId
 
 class TestConnector(topic: String, override val maxPeek: Int, 
allowMoreThanMax: Boolean)
     extends MessageConsumer
@@ -110,3 +115,121 @@ class TestConnector(topic: String, override val maxPeek: 
Int, allowMoreThanMax:
   @volatile private var closed = false
   private var offset = -1L
 }
+
+object TestMessagingProvider extends MessagingProvider {
+  val queues = mutable.Map[String, LinkedBlockingQueue[Message]]()
+
+  val instanceIdMap = mutable.Map[TestConsumer, InstanceId]()
+  override def getConsumer(config: WhiskConfig,
+                           groupId: String,
+                           topic: String,
+                           maxPeek: Int,
+                           maxPollInterval: FiniteDuration)(implicit logging: 
Logging) = {
+    this.synchronized {
+
+      val queue = queues.getOrElseUpdate(topic, {
+        new LinkedBlockingQueue[Message]()
+      })
+      new TestConsumer(queue, topic, maxPeek, false)
+    }
+
+  }
+
+  override def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit 
logging: Logging) =
+    //connector
+    new MessageProducer with StreamLogging {
+      def send(topic: String, msg: Message): Future[RecordMetadata] = {
+        val queue = queues.getOrElseUpdate(topic, {
+          new LinkedBlockingQueue[Message]()
+        })
+        queue.synchronized {
+          if (queue.offer(msg)) {
+            logging.info(this, s"put: $msg")
+            Future.successful(
+              new RecordMetadata(new TopicPartition(topic, 0), 0, queue.size, 
Record.NO_TIMESTAMP, -1, -1, -1))
+          } else {
+            logging.error(this, s"put failed: $msg")
+            Future.failed(new IllegalStateException("failed to write msg"))
+          }
+        }
+      }
+
+      def sendBulk(topic: String, msgs: Seq[Message]): Future[RecordMetadata] 
= {
+        val queue = queues.getOrElseUpdate(topic, new 
LinkedBlockingQueue[Message]())
+        queue.synchronized {
+          if (queue.addAll(msgs)) {
+            logging.info(this, s"put: ${msgs.length} messages")
+            Future.successful(
+              new RecordMetadata(new TopicPartition(topic, 0), 0, queue.size, 
Record.NO_TIMESTAMP, -1, -1, -1))
+          } else {
+            logging.error(this, s"put failed: ${msgs.length} messages")
+            Future.failed(new IllegalStateException("failed to write msg"))
+          }
+        }
+      }
+
+      def close() = {}
+      def sentCount() = counter.next()
+      val counter = new Counter()
+    }
+
+  def occupancy(topic: String) = {
+
+    queues(topic).size()
+
+  }
+}
+
+class TestConsumer(queue: LinkedBlockingQueue[Message], topic: String, val 
maxPeek: Int, allowMoreThanMax: Boolean)
+    extends MessageConsumer
+    with StreamLogging {
+  var throwCommitException = false
+  @volatile var dontPeek: Boolean = false
+  @volatile private var closed = false
+  var offset = 0l
+
+  /**
+   * Gets messages via a long poll. May or may not remove messages
+   * from the message connector. Use commit() to ensure messages are
+   * removed from the connector.
+   *
+   * @param duration for the long poll
+   * @return iterable collection (topic, partition, offset, bytes)
+   */
+  override def peek(duration: Duration): Iterable[(String, Int, Long, 
Array[Byte])] = {
+    require(closed == false, "cannot operate on a closed consumer")
+    val res = if (dontPeek) {
+      List.empty
+    } else {
+      queue.synchronized {
+        val msgs = new ArrayList[Message]
+        queue.drainTo(msgs, if (allowMoreThanMax) Int.MaxValue else maxPeek)
+        val res = msgs map { m =>
+          offset += 1
+          (topic, -1, offset, m.serialize.getBytes)
+        }
+        res
+      }
+    }
+    val sleepTime: Long = duration.toMillis
+    Thread.sleep(sleepTime)
+    res
+  }
+
+  /**
+   * Commits offsets from last peek operation to ensure they are removed
+   * from the connector.
+   */
+  override def commit(): Unit = {
+    if (throwCommitException) {
+      throw new Exception("commit failed")
+    } else {
+      // nothing to do
+    }
+  }
+
+  /** Closes consumer. */
+  override def close(): Unit = this.closed = true
+
+  def occupancy: Int = queue.size()
+}
diff --git 
a/tests/src/test/scala/whisk/core/loadBalancer/test/DistributedLoadBalancerDataTests.scala
 
b/tests/src/test/scala/whisk/core/loadBalancer/test/DistributedLoadBalancerDataTests.scala
new file mode 100644
index 0000000000..ad155c8b36
--- /dev/null
+++ 
b/tests/src/test/scala/whisk/core/loadBalancer/test/DistributedLoadBalancerDataTests.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.loadBalancer.test
+
+import akka.actor.ActorSystem
+import akka.testkit.ImplicitSender
+import akka.testkit.TestKit
+import akka.util.Timeout
+import com.typesafe.config.ConfigFactory
+import common.StreamLogging
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FlatSpecLike
+import org.scalatest.Matchers
+//import scala.collection.mutable
+import scala.concurrent.Promise
+import whisk.core.entity.InstanceId
+import scala.concurrent.duration._
+import whisk.core.entity.ActivationId
+import whisk.core.entity.UUID
+import whisk.core.entity.WhiskActivation
+import whisk.core.loadBalancer.ActivationEntry
+import whisk.core.loadBalancer.DistributedLoadBalancerData
+import whisk.core.loadBalancer.Updated
+
+// Define your test specific configuration here
+
+object DistributedLoadBalancerDataConfig {
+  val config = """
+    akka.remote.netty.tcp {
+      hostname = "127.0.0.1"
+      port = 2555
+    }
+    akka.actor.provider = cluster
+    """
+}
+
+class DistributedLoadBalancerDataTests
+    extends TestKit(
+      ActorSystem("ControllerCluster", 
ConfigFactory.parseString(DistributedLoadBalancerDataConfig.config)))
+    with ImplicitSender
+    with FlatSpecLike
+    with Matchers
+    with BeforeAndAfterAll
+    with StreamLogging {
+
+  behavior of "DistributedLoadBalancerData"
+
+  //val storageName = "Candidates"
+  val controller1 = InstanceId(123)
+  val controller2 = InstanceId(456)
+  val controller3 = InstanceId(789)
+
+  val invoker1 = InstanceId(0)
+  val invoker2 = InstanceId(1)
+
+  //val sharedDataService = 
system.actorOf(SharedDataService.props(storageName, testActor), name = 
"busyMan")
+  implicit val timeout = Timeout(5.seconds)
+
+  val activationIdPromise = Promise[Either[ActivationId, WhiskActivation]]()
+  val namespace1 = UUID()
+  val firstEntry: ActivationEntry =
+    ActivationEntry(ActivationId(), namespace1, Some(invoker1), 
activationIdPromise)
+  val secondEntry: ActivationEntry =
+    ActivationEntry(ActivationId(), namespace1, Some(invoker2), 
activationIdPromise)
+  val firstOverflowEnty: ActivationEntry = ActivationEntry(ActivationId(), 
namespace1, None, activationIdPromise)
+  val secondOverflowEnty: ActivationEntry = ActivationEntry(ActivationId(), 
namespace1, None, activationIdPromise)
+
+  val lbd1 = new DistributedLoadBalancerData(controller1, Some(testActor))
+  val lbd2 = new DistributedLoadBalancerData(controller2, Some(testActor))
+
+  it should "reflect local changes immediately and replicated changes 
eventually" in {
+
+    //store 1 activation per lbd
+    lbd1.putActivation(firstEntry.id, firstEntry)
+    lbd2.putActivation(secondEntry.id, secondEntry)
+
+    //only local changes are visible before replication completes
+    lbd1.activationCountOn(firstEntry.namespaceId) shouldBe 1
+    lbd1.activationCountPerInvoker shouldBe 
Map(firstEntry.invokerName.get.toString -> 1)
+    lbd1.activationById(firstEntry.id) shouldBe Some(firstEntry)
+
+    //1 Updated msg per storagename per LBD (4 total when not in overflow)
+    //we cannot predict order of these updates
+    expectMsgClass(classOf[Updated])
+    expectMsgClass(classOf[Updated])
+    expectMsgClass(classOf[Updated])
+    expectMsgClass(classOf[Updated])
+
+    //after replication, verify udpates
+    lbd1.activationCountOn(firstEntry.namespaceId) shouldBe 2
+    lbd1.activationCountPerInvoker shouldBe Map(
+      firstEntry.invokerName.get.toString -> 1,
+      secondEntry.invokerName.get.toString -> 1)
+    lbd1.activationById(firstEntry.id) shouldBe Some(firstEntry)
+    lbd2.activationById(secondEntry.id) shouldBe Some(secondEntry)
+
+    //both entries should NOT be visible to the other (only counters are 
replicated)
+    lbd2.activationById(firstEntry.id) shouldBe None
+    lbd1.activationById(secondEntry.id) shouldBe None
+
+    // clean up activations
+    lbd1.removeActivation(firstEntry.id)
+    lbd2.removeActivation(secondEntry.id)
+
+    //verify local changes on lbd1
+    lbd1.activationCountOn(firstEntry.namespaceId) shouldBe 1
+    lbd1.activationCountPerInvoker shouldBe Map(
+      firstEntry.invokerName.get.toString -> 0,
+      secondEntry.invokerName.get.toString -> 1)
+    lbd1.activationById(firstEntry.id) shouldBe None
+
+    //verify local changes on lbd2
+    lbd2.activationCountOn(secondEntry.namespaceId) shouldBe 1
+    lbd2.activationCountPerInvoker shouldBe Map(
+      firstEntry.invokerName.get.toString -> 1,
+      secondEntry.invokerName.get.toString -> 0)
+    lbd2.activationById(secondEntry.id) shouldBe None
+
+    //wait for replication
+    expectMsgAllClassOf(classOf[Updated], classOf[Updated], classOf[Updated], 
classOf[Updated])
+
+    //verify replicated changes on lbd1
+    lbd1.activationCountOn(firstEntry.namespaceId) shouldBe 0
+    lbd1.activationCountPerInvoker shouldBe Map(
+      firstEntry.invokerName.get.toString -> 0,
+      secondEntry.invokerName.get.toString -> 0)
+
+    //verify replicated changes on lbd2
+    lbd2.activationCountOn(secondEntry.namespaceId) shouldBe 0
+    lbd2.activationCountPerInvoker shouldBe Map(
+      firstEntry.invokerName.get.toString -> 0,
+      secondEntry.invokerName.get.toString -> 0)
+
+  }
+
+}
diff --git 
a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
 
b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
index 0e06b9ff2a..20eaba40e3 100644
--- 
a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
+++ 
b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
@@ -96,6 +96,7 @@ class InvokerSupervisionTests
   def timeout(actor: ActorRef) = actor ! FSM.StateTimeout
 
   /** Queries all invokers for their state */
+  //TODO: test for Updated message (instead of querying for GetStatus each 
time)
   def allStates(pool: ActorRef) =
     Await.result(pool.ask(GetStatus).mapTo[IndexedSeq[(InstanceId, 
InvokerState)]], timeout.duration)
 
diff --git 
a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala 
b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
index af511021de..2e43b5c495 100644
--- 
a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
+++ 
b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
@@ -24,16 +24,16 @@ import org.scalatest.{FlatSpec, Matchers}
 import whisk.core.entity.{ActivationId, UUID, WhiskActivation}
 import whisk.core.loadBalancer.{ActivationEntry, DistributedLoadBalancerData, 
LocalLoadBalancerData}
 
-import scala.concurrent.{Await, Future, Promise}
+import scala.concurrent.{Promise}
 import whisk.core.entity.InstanceId
 
-import scala.concurrent.duration._
-
 class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging {
 
   val activationIdPromise = Promise[Either[ActivationId, WhiskActivation]]()
-  val firstEntry: ActivationEntry = ActivationEntry(ActivationId(), UUID(), 
InstanceId(0), activationIdPromise)
-  val secondEntry: ActivationEntry = ActivationEntry(ActivationId(), UUID(), 
InstanceId(1), activationIdPromise)
+  val firstEntry: ActivationEntry = ActivationEntry(ActivationId(), UUID(), 
Some(InstanceId(0)), activationIdPromise)
+  val secondEntry: ActivationEntry = ActivationEntry(ActivationId(), UUID(), 
Some(InstanceId(1)), activationIdPromise)
+  val firstOverflowEnty: ActivationEntry = ActivationEntry(ActivationId(), 
UUID(), None, activationIdPromise)
+  val secondOverflowEnty: ActivationEntry = ActivationEntry(ActivationId(), 
UUID(), None, activationIdPromise)
 
   val port = 2552
   val host = "127.0.0.1"
@@ -44,32 +44,64 @@ class LoadBalancerDataTests extends FlatSpec with Matchers 
with StreamLogging {
     .withFallback(ConfigFactory.load())
 
   val actorSystemName = "controller-actor-system"
+  val instance = InstanceId(0)
 
   implicit val actorSystem = ActorSystem(actorSystemName, config)
 
-  def await[A](f: Future[A], timeout: FiniteDuration = 1.second) = 
Await.result(f, timeout)
+  //def await[A](f: Future[A], timeout: FiniteDuration = 1.second) = 
Await.result(f, timeout)
 
   behavior of "LoadBalancerData"
 
   it should "return the number of activations for a namespace" in {
-    val distributedLoadBalancerData = new DistributedLoadBalancerData()
+    val distributedLoadBalancerData = new DistributedLoadBalancerData(instance)
     val localLoadBalancerData = new LocalLoadBalancerData()
 //    test all implementations
     val loadBalancerDataArray = Array(localLoadBalancerData, 
distributedLoadBalancerData)
     loadBalancerDataArray.map { lbd =>
       lbd.putActivation(firstEntry.id, firstEntry)
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
-      await(lbd.activationCountPerInvoker) shouldBe 
Map(firstEntry.invokerName.toString -> 1)
+//      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
+//      await(lbd.activationCountPerInvoker) shouldBe 
Map(firstEntry.invokerName.get.toString -> 1)
+      lbd.activationCountOn(firstEntry.namespaceId) shouldBe 1
+      lbd.activationCountPerInvoker shouldBe 
Map(firstEntry.invokerName.get.toString -> 1)
+      lbd.activationById(firstEntry.id) shouldBe Some(firstEntry)
+
+      // clean up after yourself
+      lbd.removeActivation(firstEntry.id)
+    }
+  }
+
+  it should "return actions for invokers only when instanceId is not None" in {
+    val distributedLoadBalancerData = new DistributedLoadBalancerData(instance)
+    val localLoadBalancerData = new LocalLoadBalancerData()
+
+    val loadBalancerDataArray = Array(localLoadBalancerData, 
distributedLoadBalancerData)
+    loadBalancerDataArray.map { lbd =>
+      lbd.putActivation(firstEntry.id, firstEntry)
+      lbd.putActivation(secondEntry.id, secondEntry)
+      lbd.putActivation(firstOverflowEnty.id, firstOverflowEnty)
+      lbd.putActivation(secondOverflowEnty.id, secondOverflowEnty)
+
+      val res = lbd.activationCountPerInvoker
+
+      res.get(firstEntry.invokerName.get.toString()) shouldBe Some(1)
+      res.get(secondEntry.invokerName.get.toString()) shouldBe Some(1)
+
       lbd.activationById(firstEntry.id) shouldBe Some(firstEntry)
+      lbd.activationById(secondEntry.id) shouldBe Some(secondEntry)
+
+      lbd.overflowActivationCount shouldBe 2
 
       // clean up after yourself
       lbd.removeActivation(firstEntry.id)
+      lbd.removeActivation(secondEntry.id)
+      lbd.removeActivation(firstOverflowEnty.id)
+      lbd.removeActivation(secondOverflowEnty.id)
     }
   }
 
   it should "return the number of activations for each invoker" in {
 
-    val distributedLoadBalancerData = new DistributedLoadBalancerData()
+    val distributedLoadBalancerData = new DistributedLoadBalancerData(instance)
     val localLoadBalancerData = new LocalLoadBalancerData()
 
     val loadBalancerDataArray = Array(localLoadBalancerData, 
distributedLoadBalancerData)
@@ -77,10 +109,10 @@ class LoadBalancerDataTests extends FlatSpec with Matchers 
with StreamLogging {
       lbd.putActivation(firstEntry.id, firstEntry)
       lbd.putActivation(secondEntry.id, secondEntry)
 
-      val res = await(lbd.activationCountPerInvoker)
+      val res = lbd.activationCountPerInvoker
 
-      res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
-      res.get(secondEntry.invokerName.toString()) shouldBe Some(1)
+      res.get(firstEntry.invokerName.get.toString()) shouldBe Some(1)
+      res.get(secondEntry.invokerName.get.toString()) shouldBe Some(1)
 
       lbd.activationById(firstEntry.id) shouldBe Some(firstEntry)
       lbd.activationById(secondEntry.id) shouldBe Some(secondEntry)
@@ -94,50 +126,56 @@ class LoadBalancerDataTests extends FlatSpec with Matchers 
with StreamLogging {
 
   it should "remove activations and reflect that accordingly" in {
 
-    val distributedLoadBalancerData = new DistributedLoadBalancerData()
+    val distributedLoadBalancerData = new DistributedLoadBalancerData(instance)
     val localLoadBalancerData = new LocalLoadBalancerData()
 
     val loadBalancerDataArray = Array(localLoadBalancerData, 
distributedLoadBalancerData)
     loadBalancerDataArray.map { lbd =>
       lbd.putActivation(firstEntry.id, firstEntry)
-      val res = await(lbd.activationCountPerInvoker)
-      res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
+      lbd.putActivation(firstOverflowEnty.id, firstOverflowEnty)
+      val res = lbd.activationCountPerInvoker
+      res.get(firstEntry.invokerName.get.toString()) shouldBe Some(1)
+
+      lbd.activationCountOn(firstEntry.namespaceId) shouldBe 1
 
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
+      lbd.overflowActivationCount shouldBe 1
 
       lbd.removeActivation(firstEntry)
+      lbd.removeActivation(firstOverflowEnty)
 
-      val resAfterRemoval = await(lbd.activationCountPerInvoker)
-      resAfterRemoval.get(firstEntry.invokerName.toString()) shouldBe Some(0)
+      val resAfterRemoval = lbd.activationCountPerInvoker
+      resAfterRemoval.get(firstEntry.invokerName.get.toString()) shouldBe 
Some(0)
 
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 0
+      lbd.activationCountOn(firstEntry.namespaceId) shouldBe 0
       lbd.activationById(firstEntry.id) shouldBe None
+
+      lbd.overflowActivationCount shouldBe 0
     }
 
   }
 
   it should "remove activations from all 3 maps by activation id" in {
 
-    val distributedLoadBalancerData = new DistributedLoadBalancerData()
+    val distributedLoadBalancerData = new DistributedLoadBalancerData(instance)
     val localLoadBalancerData = new LocalLoadBalancerData()
 
     val loadBalancerDataArray = Array(localLoadBalancerData, 
distributedLoadBalancerData)
     loadBalancerDataArray.map { lbd =>
       lbd.putActivation(firstEntry.id, firstEntry)
 
-      val res = await(lbd.activationCountPerInvoker)
-      res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
+      val res = lbd.activationCountPerInvoker
+      res.get(firstEntry.invokerName.get.toString()) shouldBe Some(1)
 
       lbd.removeActivation(firstEntry.id)
 
-      val resAfterRemoval = await(lbd.activationCountPerInvoker)
-      resAfterRemoval.get(firstEntry.invokerName.toString()) shouldBe Some(0)
+      val resAfterRemoval = lbd.activationCountPerInvoker
+      resAfterRemoval.get(firstEntry.invokerName.get.toString()) shouldBe 
Some(0)
     }
 
   }
 
   it should "return None if the entry doesn't exist when we remove it" in {
-    val distributedLoadBalancerData = new DistributedLoadBalancerData()
+    val distributedLoadBalancerData = new DistributedLoadBalancerData(instance)
     val localLoadBalancerData = new LocalLoadBalancerData()
 
     val loadBalancerDataArray = Array(localLoadBalancerData, 
distributedLoadBalancerData)
@@ -149,65 +187,79 @@ class LoadBalancerDataTests extends FlatSpec with 
Matchers with StreamLogging {
 
   it should "respond with different values accordingly" in {
 
-    val entry = ActivationEntry(ActivationId(), UUID(), InstanceId(1), 
activationIdPromise)
+    val entry = ActivationEntry(ActivationId(), UUID(), Some(InstanceId(1)), 
activationIdPromise)
     val entrySameInvokerAndNamespace = entry.copy(id = ActivationId())
     val entrySameInvoker = entry.copy(id = ActivationId(), namespaceId = 
UUID())
+    val entryNoInvoker = entry.copy(id = ActivationId(), namespaceId = UUID(), 
invokerName = None)
 
-    val distributedLoadBalancerData = new DistributedLoadBalancerData()
+    val distributedLoadBalancerData = new DistributedLoadBalancerData(instance)
     val localLoadBalancerData = new LocalLoadBalancerData()
 
     val loadBalancerDataArray = Array(localLoadBalancerData, 
distributedLoadBalancerData)
     loadBalancerDataArray.map { lbd =>
       lbd.putActivation(entry.id, entry)
 
-      await(lbd.activationCountOn(entry.namespaceId)) shouldBe 1
-      var res = await(lbd.activationCountPerInvoker)
-      res.get(entry.invokerName.toString()) shouldBe Some(1)
+      lbd.activationCountOn(entry.namespaceId) shouldBe 1
+      var res = lbd.activationCountPerInvoker
+      res.get(entry.invokerName.get.toString()) shouldBe Some(1)
 
       lbd.putActivation(entrySameInvokerAndNamespace.id, 
entrySameInvokerAndNamespace)
-      await(lbd.activationCountOn(entry.namespaceId)) shouldBe 2
-      res = await(lbd.activationCountPerInvoker)
-      res.get(entry.invokerName.toString()) shouldBe Some(2)
+      lbd.activationCountOn(entry.namespaceId) shouldBe 2
+      res = lbd.activationCountPerInvoker
+      res.get(entry.invokerName.get.toString()) shouldBe Some(2)
+
+      lbd.putActivation(entryNoInvoker.id, entryNoInvoker)
+      lbd.activationCountOn(entry.namespaceId) shouldBe 2
+      res = lbd.activationCountPerInvoker
+      res.get(entry.invokerName.get.toString()) shouldBe Some(2)
+      lbd.overflowActivationCount shouldBe 1
 
       lbd.putActivation(entrySameInvoker.id, entrySameInvoker)
-      await(lbd.activationCountOn(entry.namespaceId)) shouldBe 2
-      res = await(lbd.activationCountPerInvoker)
-      res.get(entry.invokerName.toString()) shouldBe Some(3)
+      lbd.activationCountOn(entry.namespaceId) shouldBe 2
+      res = lbd.activationCountPerInvoker
+      res.get(entry.invokerName.get.toString()) shouldBe Some(3)
 
       lbd.removeActivation(entrySameInvokerAndNamespace)
-      await(lbd.activationCountOn(entry.namespaceId)) shouldBe 1
-      res = await(lbd.activationCountPerInvoker)
-      res.get(entry.invokerName.toString()) shouldBe Some(2)
+      lbd.activationCountOn(entry.namespaceId) shouldBe 1
+      res = lbd.activationCountPerInvoker
+      res.get(entry.invokerName.get.toString()) shouldBe Some(2)
+
+      lbd.removeActivation(entryNoInvoker)
+      lbd.activationCountOn(entry.namespaceId) shouldBe 1
+      res = lbd.activationCountPerInvoker
+      res.get(entry.invokerName.get.toString()) shouldBe Some(2)
+      lbd.overflowActivationCount shouldBe 0
 
       // removing non existing entry doesn't mess up
       lbd.removeActivation(entrySameInvokerAndNamespace)
-      await(lbd.activationCountOn(entry.namespaceId)) shouldBe 1
-      res = await(lbd.activationCountPerInvoker)
-      res.get(entry.invokerName.toString()) shouldBe Some(2)
+      lbd.activationCountOn(entry.namespaceId) shouldBe 1
+      res = lbd.activationCountPerInvoker
+      res.get(entry.invokerName.get.toString()) shouldBe Some(2)
 
       // clean up
       lbd.removeActivation(entry)
       lbd.removeActivation(entrySameInvoker.id)
+      lbd.removeActivation(entryNoInvoker)
     }
 
   }
 
   it should "not add the same entry more then once" in {
 
-    val distributedLoadBalancerData = new DistributedLoadBalancerData()
+    val distributedLoadBalancerData = new DistributedLoadBalancerData(instance)
     val localLoadBalancerData = new LocalLoadBalancerData()
 
     val loadBalancerDataArray = Array(localLoadBalancerData, 
distributedLoadBalancerData)
     loadBalancerDataArray.map { lbd =>
       lbd.putActivation(firstEntry.id, firstEntry)
-      val res = await(lbd.activationCountPerInvoker)
-      res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
+      val res = lbd.activationCountPerInvoker
+      res.get(firstEntry.invokerName.get.toString()) shouldBe Some(1)
+      lbd.activationCountOn(firstEntry.namespaceId) shouldBe 1
 
       lbd.putActivation(firstEntry.id, firstEntry)
-      val resAfterAddingTheSameEntry = await(lbd.activationCountPerInvoker)
-      resAfterAddingTheSameEntry.get(firstEntry.invokerName.toString()) 
shouldBe Some(1)
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
+      val resAfterAddingTheSameEntry = lbd.activationCountPerInvoker
+      resAfterAddingTheSameEntry.get(firstEntry.invokerName.get.toString()) 
shouldBe Some(1)
+      lbd.activationCountOn(firstEntry.namespaceId) shouldBe 1
 
       lbd.removeActivation(firstEntry)
       lbd.removeActivation(firstEntry)
@@ -217,7 +269,7 @@ class LoadBalancerDataTests extends FlatSpec with Matchers 
with StreamLogging {
 
   it should "not evaluate the given block if an entry already exists" in {
 
-    val distributedLoadBalancerData = new DistributedLoadBalancerData()
+    val distributedLoadBalancerData = new DistributedLoadBalancerData(instance)
     val localLoadBalancerData = new LocalLoadBalancerData()
 
     val loadBalancerDataArray = Array(localLoadBalancerData, 
distributedLoadBalancerData)
@@ -248,7 +300,7 @@ class LoadBalancerDataTests extends FlatSpec with Matchers 
with StreamLogging {
 
   it should "not evaluate the given block even if an entry is different (but 
has the same id)" in {
 
-    val distributedLoadBalancerData = new DistributedLoadBalancerData()
+    val distributedLoadBalancerData = new DistributedLoadBalancerData(instance)
     val localLoadBalancerData = new LocalLoadBalancerData()
 
     val loadBalancerDataArray = Array(localLoadBalancerData, 
distributedLoadBalancerData)
@@ -263,9 +315,9 @@ class LoadBalancerDataTests extends FlatSpec with Matchers 
with StreamLogging {
 
       called shouldBe 1
 
-      val res = await(lbd.activationCountPerInvoker)
-      res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
+      val res = lbd.activationCountPerInvoker
+      res.get(firstEntry.invokerName.get.toString()) shouldBe Some(1)
+      lbd.activationCountOn(firstEntry.namespaceId) shouldBe 1
 
       // entry already exists, should not evaluate the block and change the 
state
       val entryAfterSecond = lbd.putActivation(entrySameId.id, {
@@ -275,9 +327,9 @@ class LoadBalancerDataTests extends FlatSpec with Matchers 
with StreamLogging {
 
       called shouldBe 1
       entry shouldBe entryAfterSecond
-      val resAfterAddingTheSameEntry = await(lbd.activationCountPerInvoker)
-      resAfterAddingTheSameEntry.get(firstEntry.invokerName.toString()) 
shouldBe Some(1)
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
+      val resAfterAddingTheSameEntry = lbd.activationCountPerInvoker
+      resAfterAddingTheSameEntry.get(firstEntry.invokerName.get.toString()) 
shouldBe Some(1)
+      lbd.activationCountOn(firstEntry.namespaceId) shouldBe 1
     }
 
   }
diff --git 
a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerServiceObjectTests.scala
 
b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerServiceObjectTests.scala
index 3f3dca4794..fecc214bca 100644
--- 
a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerServiceObjectTests.scala
+++ 
b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerServiceObjectTests.scala
@@ -17,15 +17,17 @@
 
 package whisk.core.loadBalancer.test
 
+import common.StreamLogging
 import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.FlatSpec
+import org.scalatest.FlatSpecLike
 import org.scalatest.Matchers
-import whisk.core.loadBalancer.LoadBalancerService
+import org.scalatest.junit.JUnitRunner
+
+import whisk.core.entity.InstanceId
 import whisk.core.loadBalancer.Healthy
+import whisk.core.loadBalancer.LoadBalancerService
 import whisk.core.loadBalancer.Offline
 import whisk.core.loadBalancer.UnHealthy
-import whisk.core.entity.InstanceId
 
 /**
  * Unit tests for the ContainerPool object.
@@ -34,7 +36,7 @@ import whisk.core.entity.InstanceId
  * of the ContainerPool object.
  */
 @RunWith(classOf[JUnitRunner])
-class LoadBalancerServiceObjectTests extends FlatSpec with Matchers {
+class LoadBalancerServiceObjectTests extends FlatSpecLike with Matchers with 
StreamLogging {
   behavior of "memoize"
 
   it should "not recompute a value which was already given" in {
@@ -78,34 +80,46 @@ class LoadBalancerServiceObjectTests extends FlatSpec with 
Matchers {
   def hashInto[A](list: Seq[A], hash: Int) = list(hash % list.size)
 
   it should "return None on an empty invokers list" in {
-    LoadBalancerService.schedule(IndexedSeq(), 0, 1) shouldBe None
+    LoadBalancerService.schedule(IndexedSeq(), 1, 1, false) shouldBe None
+    LoadBalancerService.schedule(IndexedSeq(), 1, 1, true) shouldBe None
   }
 
   it should "return None on a list of offline/unhealthy invokers" in {
     val invs = IndexedSeq((InstanceId(0), Offline, 0), (InstanceId(1), 
UnHealthy, 0))
 
-    LoadBalancerService.schedule(invs, 0, 1) shouldBe None
+    LoadBalancerService.schedule(invs, 1, 1, false) shouldBe None
+    LoadBalancerService.schedule(invs, 1, 1, true) shouldBe None
   }
 
   it should "schedule to the home invoker" in {
     val invs = invokers(10)
     val hash = 2
 
-    LoadBalancerService.schedule(invs, 1, hash) shouldBe Some(InstanceId(hash 
% invs.size))
+    LoadBalancerService.schedule(invs, 1, hash, false) shouldBe 
Some(InstanceId(hash % invs.size))
+    LoadBalancerService.schedule(invs, 1, hash, true) shouldBe 
Some(InstanceId(hash % invs.size))
   }
 
   it should "take the only online invoker" in {
     LoadBalancerService.schedule(
       IndexedSeq((InstanceId(0), Offline, 0), (InstanceId(1), UnHealthy, 0), 
(InstanceId(2), Healthy, 0)),
-      0,
-      1) shouldBe Some(InstanceId(2))
+      1,
+      1,
+      false) shouldBe Some(InstanceId(2))
+
+    LoadBalancerService.schedule(
+      IndexedSeq((InstanceId(0), Offline, 0), (InstanceId(1), UnHealthy, 0), 
(InstanceId(2), Healthy, 0)),
+      1,
+      1,
+      true) shouldBe Some(InstanceId(2))
+
   }
 
   it should "skip an offline/unhealthy invoker, even if its underloaded" in {
     val hash = 0
-    val invs = IndexedSeq((InstanceId(0), Healthy, 10), (InstanceId(1), 
UnHealthy, 0), (InstanceId(2), Healthy, 0))
+    val invs = IndexedSeq((InstanceId(0), Healthy, 11), (InstanceId(1), 
UnHealthy, 0), (InstanceId(2), Healthy, 0))
 
-    LoadBalancerService.schedule(invs, 10, hash) shouldBe Some(InstanceId(2))
+    LoadBalancerService.schedule(invs, 10, hash, false) shouldBe 
Some(InstanceId(2))
+    LoadBalancerService.schedule(invs, 10, hash, true) shouldBe 
Some(InstanceId(2))
   }
 
   it should "jump to the next invoker determined by a hashed stepsize if the 
home invoker is overloaded" in {
@@ -116,7 +130,8 @@ class LoadBalancerServiceObjectTests extends FlatSpec with 
Matchers {
     val invs = invokers(invokerCount).updated(targetInvoker, 
(InstanceId(targetInvoker), Healthy, 1))
     val step = 
hashInto(LoadBalancerService.pairwiseCoprimeNumbersUntil(invokerCount), hash)
 
-    LoadBalancerService.schedule(invs, 1, hash) shouldBe Some(InstanceId((hash 
+ step) % invs.size))
+    LoadBalancerService.schedule(invs, 1, hash, true) shouldBe 
Some(InstanceId((hash + step) % invs.size))
+    LoadBalancerService.schedule(invs, 1, hash, false) shouldBe 
Some(InstanceId((hash + step) % invs.size))
   }
 
   it should "wrap the search at the end of the invoker list" in {
@@ -130,7 +145,8 @@ class LoadBalancerServiceObjectTests extends FlatSpec with 
Matchers {
 
     // invoker1 is overloaded so it will step (2 steps) to the next one --> 1 
2 0 --> invoker0 is next target
     // invoker0 is overloaded so it will step to the next one --> 0 1 2 --> 
invoker2 is next target and underloaded
-    LoadBalancerService.schedule(invs, 1, hash) shouldBe Some(InstanceId((hash 
+ step + step) % invs.size))
+    LoadBalancerService.schedule(invs, 1, hash, true) shouldBe 
Some(InstanceId((hash + step + step) % invs.size))
+    LoadBalancerService.schedule(invs, 1, hash, false) shouldBe 
Some(InstanceId((hash + step + step) % invs.size))
   }
 
   it should "multiply its threshold in 3 iterations to find an invoker with a 
good warm-chance" in {
@@ -140,22 +156,30 @@ class LoadBalancerServiceObjectTests extends FlatSpec 
with Matchers {
     // even though invoker1 is not the home invoker in this case, it gets 
chosen over
     // the others because it's the first one encountered by the iteration 
mechanism to be below
     // the threshold of 3 * 16 invocations
-    LoadBalancerService.schedule(invs, 16, hash) shouldBe Some(InstanceId(0))
+    LoadBalancerService.schedule(invs, 16, hash, true) shouldBe 
Some(InstanceId(0))
+    // when not in overflow state, won't iterate and progress the busy 
threshold
+    LoadBalancerService.schedule(invs, 16, hash, false) shouldBe None
   }
 
   it should "choose the home invoker if all invokers are overloaded even above 
the muliplied threshold" in {
     val invs = IndexedSeq((InstanceId(0), Healthy, 51), (InstanceId(1), 
Healthy, 50), (InstanceId(2), Healthy, 49))
     val hash = 0 // home is 0, stepsize is 1
 
-    LoadBalancerService.schedule(invs, 16, hash) shouldBe Some(InstanceId(0))
+    LoadBalancerService.schedule(invs, 16, hash, true) shouldBe 
Some(InstanceId(0))
+    // when not in overflow state, won't iterate and progress the busy 
threshold
+    LoadBalancerService.schedule(invs, 16, hash, false) shouldBe None
   }
 
   it should "transparently work with partitioned sets of invokers" in {
     val invs = IndexedSeq((InstanceId(3), Healthy, 0), (InstanceId(4), 
Healthy, 0), (InstanceId(5), Healthy, 0))
 
-    LoadBalancerService.schedule(invs, 1, 0) shouldBe Some(InstanceId(3))
-    LoadBalancerService.schedule(invs, 1, 1) shouldBe Some(InstanceId(4))
-    LoadBalancerService.schedule(invs, 1, 2) shouldBe Some(InstanceId(5))
-    LoadBalancerService.schedule(invs, 1, 3) shouldBe Some(InstanceId(3))
+    LoadBalancerService.schedule(invs, 1, 0, true) shouldBe Some(InstanceId(3))
+    LoadBalancerService.schedule(invs, 1, 1, true) shouldBe Some(InstanceId(4))
+    LoadBalancerService.schedule(invs, 1, 2, true) shouldBe Some(InstanceId(5))
+    LoadBalancerService.schedule(invs, 1, 3, true) shouldBe Some(InstanceId(3))
+    LoadBalancerService.schedule(invs, 1, 0, false) shouldBe 
Some(InstanceId(3))
+    LoadBalancerService.schedule(invs, 1, 1, false) shouldBe 
Some(InstanceId(4))
+    LoadBalancerService.schedule(invs, 1, 2, false) shouldBe 
Some(InstanceId(5))
+    LoadBalancerService.schedule(invs, 1, 3, false) shouldBe 
Some(InstanceId(3))
   }
 }
diff --git 
a/tests/src/test/scala/whisk/core/loadBalancer/test/OverflowTests.scala 
b/tests/src/test/scala/whisk/core/loadBalancer/test/OverflowTests.scala
new file mode 100644
index 0000000000..eb9384361f
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/OverflowTests.scala
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.loadBalancer.test
+
+import akka.actor.Actor
+import akka.actor.ActorSystem
+import akka.actor.PoisonPill
+import akka.actor.Props
+import akka.cluster.Cluster
+import akka.testkit.ImplicitSender
+import akka.testkit.TestKit
+import akka.testkit.TestProbe
+import com.typesafe.config.ConfigFactory
+import common.StreamLogging
+import java.time.Instant
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FlatSpecLike
+import org.scalatest.Matchers
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+import scala.collection.mutable.ListBuffer
+import scala.collection.mutable
+import scala.concurrent.Await
+import scala.concurrent.Future
+import spray.json.JsNumber
+import spray.json.JsObject
+import whisk.common.TransactionId
+import whisk.core.WhiskConfig
+import whisk.core.connector.ActivationMessage
+import whisk.core.connector.CompletionMessage
+import whisk.core.connector.MessagingProvider
+import whisk.core.connector.test.TestConsumer
+import whisk.core.loadBalancer.DistributedLoadBalancerData
+import whisk.core.loadBalancer.StaticSeedNodesProvider
+import whisk.core.loadBalancer.Updated
+import whisk.core.connector.test.TestMessagingProvider
+import whisk.core.entitlement.Privilege
+import whisk.core.entity.ActivationId
+import whisk.core.entity.ActivationResponse
+import whisk.core.entity.AuthKey
+import whisk.core.entity.CodeExecAsString
+import whisk.core.entity.DocRevision
+import whisk.core.entity.EntityName
+import whisk.core.entity.EntityPath
+import whisk.core.entity.ExecManifest
+import whisk.core.entity.ExecManifest.ImageName
+import whisk.core.entity.ExecManifest.RuntimeManifest
+import whisk.core.entity.ExecutableWhiskAction
+import whisk.core.entity.Identity
+import whisk.core.entity.InstanceId
+import whisk.core.entity.Secret
+import whisk.core.entity.Subject
+import whisk.core.entity.UUID
+import whisk.core.entity.WhiskActivation
+import whisk.core.entity.WhiskEntityStore
+import whisk.core.loadBalancer.GetStatus
+import whisk.core.loadBalancer.Healthy
+import whisk.core.loadBalancer.LoadBalancerActorService
+import whisk.core.loadBalancer.StatusUpdate
+import whisk.core.loadBalancer.SubscribeLoadBalancer
+import whisk.spi.SpiLoader
+import whisk.spi.TypesafeConfigClassResolver
+
+object LoadBlanacerTestKitConfig {
+  val config = """
+    akka.actor.provider = cluster
+    whisk.spi.MessagingProvider = 
whisk.core.connector.test.TestMessagingProvider
+    """
+}
+class OverflowTests
+    extends TestKit(ActorSystem("ControllerCluster", 
ConfigFactory.parseString(LoadBlanacerTestKitConfig.config)))
+    with ImplicitSender
+    with FlatSpecLike
+    with Matchers
+    with BeforeAndAfterAll
+    with MockFactory
+    with StreamLogging {
+
+  override def afterAll {
+    TestKit.shutdownActorSystem(system)
+  }
+
+  def invokers(n: Int, messagingProvider: MessagingProvider, whiskConfig: 
WhiskConfig) =
+    (0 until n).map(
+      i =>
+        (
+          InstanceId(i),
+          Healthy,
+          messagingProvider
+            .getConsumer(whiskConfig, "invokers", s"invoker${i}", 10, 
maxPollInterval = 50.milliseconds)))
+  def activation(id: ActivationId) =
+    WhiskActivation(
+      namespace = EntityPath("ns"),
+      name = EntityName("a"),
+      Subject(),
+      activationId = id,
+      start = Instant.now(),
+      end = Instant.now(),
+      response = ActivationResponse.success(Some(JsObject("res" -> 
JsNumber(1)))),
+      duration = Some(123))
+
+  implicit val ec = system.dispatcher
+
+  behavior of "overflow"
+
+  //allow our config to feed the SpiLoader
+  TypesafeConfigClassResolver.config = system.settings.config
+
+  val whiskConfig = new WhiskConfig(
+    WhiskEntityStore.requiredProperties ++
+      ExecManifest.requiredProperties ++
+      Map(WhiskConfig.loadbalancerInvokerBusyThreshold -> "1"))
+
+  val invs = invokers(4, TestMessagingProvider, whiskConfig)
+  val poolActor = system.actorOf(
+    Props(new Actor {
+      override def receive = {
+        case SubscribeLoadBalancer(lbActor) =>
+          lbActor ! StatusUpdate(invs.map(i => (i._1, i._2)))
+        case GetStatus =>
+          sender() ! invs.map(i => (i._1, i._2))
+      }
+    }),
+    "testpool")
+  val producer = TestMessagingProvider.getProducer(whiskConfig, 
system.dispatcher)
+
+  ExecManifest.initialize(whiskConfig)
+  // handle on the entity datastore
+  val entityStore = WhiskEntityStore.datastore(whiskConfig)
+  val authKey = AuthKey(UUID(), Secret())
+
+  /** Specify how seed nodes are generated */
+  val seedNodesProvider = new 
StaticSeedNodesProvider(whiskConfig.controllerSeedNodes, system.name)
+  Cluster(system).joinSeedNodes(seedNodesProvider.getSeedNodes())
+
+  it should "switch to overflow once capacity is exhausted, and switch back to 
underflow once capacity is available" in {
+
+    val monitor = TestProbe()
+    val controllerInstance = 9
+    val messagingProvider = SpiLoader.get[MessagingProvider]
+//    val pingConsumer = createPingConsumer(messagingProvider, 
InstanceId(controllerInstance))
+//    val ackConsumer = createAckConsumer(messagingProvider, 
InstanceId(controllerInstance))
+//    val overflowConsumer = createOverflowConsumer(messagingProvider, 
InstanceId(controllerInstance))
+
+    val instance = InstanceId(controllerInstance)
+    val lbData = new DistributedLoadBalancerData(instance, Some(testActor))
+    val lb = new LoadBalancerActorService(
+      whiskConfig,
+      instance,
+      poolActor,
+//      pingConsumer,
+//      ackConsumer,
+//      overflowConsumer,
+      lbData)
+//    lb.updateInvokers(invs.map(i => (i._1, i._2)))
+    val exec = CodeExecAsString(RuntimeManifest("actionKind", 
ImageName("testImage")), "testCode", None)
+    val action = ExecutableWhiskAction(EntityPath("actionSpace"), 
EntityName("actionName"), exec)
+
+    //verify underflow
+    //    lb.overflowState.get() shouldBe false
+    //TODO: use default (16) for threshold
+    val futures = ListBuffer[Future[Any]]()
+
+    //there is 1 invoker reserved for blackbox currently:
+    val numInvokers = invs.size - 1
+    val completions = ListBuffer[CompletionMessage]();
+    (1 to numInvokers).foreach(i => {
+//      futures += lb.publish(action, createActivation(action, idGen.make(), 
TransactionId(i)))(TransactionId(i))
+
+      val id = idGen.make()
+      //activations += id
+      futures += lb.publish(action, createActivation(action, id, 
TransactionId(i)))(TransactionId(i))
+      completions += CompletionMessage(TransactionId.testing, 
Right(activation(id)), invs(i - 1)._1)
+    })
+    //wait for queueing
+    eventually(timeout(5000 millis), interval(50 millis)) {
+      TestMessagingProvider.occupancy("invoker0") shouldBe 1
+      TestMessagingProvider.occupancy("invoker1") shouldBe 1
+      TestMessagingProvider.occupancy("invoker2") shouldBe 1
+    }
+    //wait for replication
+//    eventually(timeout(5000 millis), interval(50 millis)) {
+    expectMsg(Updated("Namespaces", Map(authKey.uuid.toString -> Map(9 -> 3))))
+    expectMsg(
+      Updated(
+        "Invokers",
+        Map("InstanceId(1)" -> Map(9 -> 1), "InstanceId(2)" -> Map(9 -> 1), 
"InstanceId(0)" -> Map(9 -> 1))))
+    //  }
+    monitor.expectNoMsg()
+    //disable reading from overflow
+    lb.overflowConsumer.asInstanceOf[TestConsumer].dontPeek = true
+//    TestMessagingProvider.paused += "overflow"
+
+    //1 more message will overflow
+    val id = idGen.make()
+    futures += lb.publish(action, createActivation(action, id, 
TransactionId(100)))(TransactionId(100))
+
+    //monitor.expectMsg(Overflow)
+    //wait for replication
+//    expectMsg(Updated("Overflow", Map("overflow" -> Map(8 -> 1))))
+//    expectMsg(Updated("Namespaces", Map(authKey.uuid.toString -> Map(8 -> 
4))))
+
+    //process messages
+    (1 to invs.size - 1).foreach(index => {
+      val i = invs(index - 1)
+      val msgs = i._3.peek(100.milliseconds)
+      //println(s"found ${msgs.size} in invoker${i._1}")
+      i._3.commit()
+    })
+
+    eventually(timeout(5000 millis), interval(50 millis)) {
+      TestMessagingProvider.occupancy("overflow") shouldBe 1
+      TestMessagingProvider.occupancy("invoker0") shouldBe 0
+      TestMessagingProvider.occupancy("invoker1") shouldBe 0
+      TestMessagingProvider.occupancy("invoker2") shouldBe 0
+    }
+    //reenable the overflow processing
+    lb.overflowConsumer.asInstanceOf[TestConsumer].dontPeek = false
+//    TestMessagingProvider.paused -= "overflow"
+    //send 1 completion
+    //val id = activations.remove(0).activationId
+    //val completion = CompletionMessage(TransactionId.testing, 
Right(activation(id)), InstanceId(0))
+//    val completion = CompletionMessage(TransactionId.testing, 
Right(activation(id)), InstanceId(0))
+//
+//    producer.send(s"completed${controllerInstance}", completion)
+
+    //verify underflow
+    //monitor.expectMsg(Underflow)
+
+    //complete the other activations
+//    val lastActivation = activations.size
+//    (1 to lastActivation).foreach(i => {
+//      val id = activations.remove(0).activationId
+//      val completion = CompletionMessage(TransactionId.testing, 
Right(activation(id)), InstanceId(i))
+//      producer.send(s"completed${controllerInstance}", completion)
+//
+//    })
+    completions.foreach(producer.send(s"completed${controllerInstance}", _))
+
+    eventually(timeout(5000 millis), interval(50 millis)) {
+      TestMessagingProvider.occupancy("invoker1") shouldBe 1
+
+    }
+
+    val msgs = invs(1)._3.peek(100.milliseconds)
+    //println(s"found ${msgs.size} in invoker${invs(1)._1}")
+
+//    expectMsg(Updated("Namespaces", Map(authKey.uuid.toString -> Map(8 -> 
4))))
+//    expectMsg(
+//      Updated(
+//        "Invokers",
+//        Map("InstanceId(0)" -> Map(8 -> 0), "InstanceId(1)" -> Map(8 -> 1), 
"InstanceId(2)" -> Map(8 -> 0))))
+
+    //invs(1)._3.commit()
+
+//    //wait for replication
+//    eventually(timeout(5000 millis), interval(50 millis)) {
+//      expectMsg(Updated("Namespaces", Map(authKey.uuid.toString -> Map(8 -> 
1))))
+//      expectMsg(
+//        Updated(
+//          "Invokers",
+//          Map("InstanceId(0)" -> Map(8 -> 0), "InstanceId(1)" -> Map(8 -> 
1), "InstanceId(2)" -> Map(8 -> 0))))
+//    }
+
+    eventually(timeout(5000 millis), interval(50 millis)) {
+      expectMsg(Updated("Overflow", Map("overflow" -> Map(9 -> 0))))
+    }
+    eventually(timeout(5000 millis), interval(50 millis)) {
+      //expectMsg(Updated("Overflow", Map("overflow" -> Map(8 -> 0))))
+      TestMessagingProvider.occupancy("invoker0") shouldBe 0
+      TestMessagingProvider.occupancy("invoker1") shouldBe 0
+      TestMessagingProvider.occupancy("invoker2") shouldBe 0
+      TestMessagingProvider.occupancy("overflow") shouldBe 0
+
+    }
+
+    //println(s"sending ${completions.size} completions")
+    completions.foreach(producer.send(s"completed${controllerInstance}", _))
+    //println("waiting for completed9 to drain...")
+    eventually(timeout(5000 millis), interval(50 millis)) {
+      TestMessagingProvider.occupancy(s"completed${controllerInstance}") 
shouldBe 0
+    }
+    //println("drain completed...")
+    Await.ready(Future.sequence(futures), 5.seconds)
+
+    //send completion for the overflow msg
+
+    producer.send(
+      s"completed${controllerInstance}",
+      CompletionMessage(TransactionId.testing, Right(activation(id)), 
invs(1)._1))
+
+    //wait for replication
+    eventually(timeout(5000 millis), interval(50 millis)) {
+      expectMsg(Updated("Namespaces", Map(authKey.uuid.toString -> Map(9 -> 
0))))
+      //expectMsg(Updated("Overflow", Map("overflow" -> Map(8 -> 0))))
+      expectMsg(
+        Updated(
+          "Invokers",
+          Map("InstanceId(0)" -> Map(9 -> 0), "InstanceId(1)" -> Map(9 -> 0), 
"InstanceId(2)" -> Map(9 -> 0))))
+    }
+
+//    lb.overflowConsumer.close()
+//    lb.activeAckConsumer.close()
+
+    lb.lbActor ! PoisonPill
+  }
+
+  it should "allow controller1 to process requests from controller0 when 
controller0 goes into overflow" in {
+
+    val monitor = TestProbe()
+    val messagingProvider = SpiLoader.get[MessagingProvider]
+    //configure lb1 to NOT process overflow messages
+    val instance1 = new InstanceId(9)
+    val instance2 = InstanceId(10)
+//    val pingConsumer = createPingConsumer(messagingProvider, instance1)
+    val maxPingsPerPoll = 54
+
+    val maxActiveAcksPerPoll = 54
+    val maxOverflowPerPoll = 5
+    val overflowCapacity = Some(5)
+
+//    val activeAckConsumer1 = createAckConsumer(messagingProvider, instance1)
+//    val activeAckConsumer2 = createAckConsumer(messagingProvider, instance2)
+//    val overflowConsumer1 = createOverflowConsumer(messagingProvider, 
instance1)
+//    val overflowConsumer2 = createOverflowConsumer(messagingProvider, 
instance2)
+
+    //start with nothing queued
+    TestMessagingProvider.occupancy("invoker0") shouldBe 0
+    TestMessagingProvider.occupancy("invoker1") shouldBe 0
+    TestMessagingProvider.occupancy("invoker2") shouldBe 0
+
+    //disable reading from overflow on lb1
+//    lb1.overflowConsumer.asInstanceOf[TestConsumer].dontPeek = true
+//    TestMessagingProvider.paused += "overflow"
+
+    val lbData1 = new DistributedLoadBalancerData(instance1, Some(testActor))
+    val lbData2 = new DistributedLoadBalancerData(instance2, Some(testActor))
+    val lb1 =
+      new LoadBalancerActorService(whiskConfig, InstanceId(9), poolActor, 
lbData1)
+    lb1.overflowConsumer.asInstanceOf[TestConsumer].dontPeek = true
+    lb1.activeAckConsumer.asInstanceOf[TestConsumer].dontPeek = true
+    TestMessagingProvider.occupancy("overflow") shouldBe 0
+
+    val lb2 =
+      new LoadBalancerActorService(whiskConfig, InstanceId(10), poolActor, 
lbData2)
+//    lb1.updateInvokers(invs.map(i => (i._1, i._2)))
+//    lb2.updateInvokers(invs.map(i => (i._1, i._2)))
+    val exec = CodeExecAsString(RuntimeManifest("actionKind", 
ImageName("testImage")), "testCode", None)
+    val action = ExecutableWhiskAction(EntityPath("actionSpace"), 
EntityName("actionName"), exec)
+
+    //verify underflow
+    //    lb.overflowState.get() shouldBe false
+    //TODO: use default (16) for threshold
+    val futures = ListBuffer[Future[Any]]()
+
+    //there is 1 invoker reserved for blackbox currently:
+    val numInvokers = invs.size - 1
+    //send 1 activation per invoker
+    val activations = ListBuffer[ActivationId]();
+    val completions = ListBuffer[CompletionMessage]();
+    (1 to numInvokers).foreach(i => {
+      val id = idGen.make()
+      activations += id
+      futures += lb1.publish(action, createActivation(action, id, 
TransactionId(i)))(TransactionId(i))
+      completions += CompletionMessage(TransactionId.testing, 
Right(activation(id)), invs(i - 1)._1)
+    })
+    eventually(timeout(5000 millis), interval(50 millis)) {
+      expectMsg(Updated("Namespaces", Map(authKey.uuid.toString -> Map(9 -> 
3))))
+//      expectMsg(
+//        Updated(
+//          "Invokers",
+//          Map("InstanceId(0)" -> Map(9 -> 1), "InstanceId(1)" -> Map(9 -> 
1), "InstanceId(2)" -> Map(9 -> 1))))
+
+    }
+    //
+
+    //wait for queueing
+    eventually(timeout(5000 millis), interval(50 millis)) {
+      TestMessagingProvider.occupancy("invoker0") shouldBe 1
+      TestMessagingProvider.occupancy("invoker1") shouldBe 1
+      TestMessagingProvider.occupancy("invoker2") shouldBe 1
+    }
+
+    //TestMessagingProvider.consumers("overflow").dontPeek = true
+    //overflowConsumer1.asInstanceOf[TestConsumer].dontPeek = true
+
+    //publish one more to cause overflow
+    val overflowedActivation = createActivation(action, idGen.make(), 
TransactionId(4))
+    futures += lb1.publish(action, overflowedActivation)(TransactionId(4))
+
+    //wait for overflow queueing
+    //monitor.expectMsg(Overflow)
+
+    eventually(timeout(5000 millis), interval(50 millis)) {
+      expectMsg(Updated("Namespaces", Map(authKey.uuid.toString -> Map(9 -> 
4))))
+
+    }
+
+    //wait for overflow draining
+    //make sure it was sent to invoker1
+    eventually(timeout(15000 millis), interval(50 millis)) {
+      TestMessagingProvider.occupancy("invoker1") shouldBe 2
+
+    }
+    //disable processing of acks in both lbs, to verify queue routing
+    lb1.activeAckConsumer.asInstanceOf[TestConsumer].dontPeek = true
+    lb2.activeAckConsumer.asInstanceOf[TestConsumer].dontPeek = true
+//    TestMessagingProvider.paused += "completed9"
+//    TestMessagingProvider.paused += "completed10"
+
+    //emulate completion in invoker TODO: create an invoker emulator
+    val id = overflowedActivation.activationId
+    val completion = CompletionMessage(TransactionId.testing, 
Right(activation(id)), InstanceId(1))
+    producer.send(s"completed10", completion)
+    //we should first get a completion for lb2 (where overflow was processed)
+    //TestMessagingProvider.occupancy("completed10") shouldBe 1
+
+    //we should first get a completion for lb2 (where overflow was processed)
+    eventually(timeout(5000 millis), interval(50 millis)) {
+//      activeAckConsumer2.asInstanceOf[TestConsumer].offset shouldBe 1
+      TestMessagingProvider.occupancy("completed10") shouldBe 1
+    }
+
+    //disable lb1 ack processing
+    //println("disabling peek on lb1")
+    //activeAckConsumer1.asInstanceOf[TestConsumer].dontPeek = true
+    //TestMessagingProvider.paused -= "completed9"
+
+    //reenable lb2 ack processing
+    lb2.activeAckConsumer.asInstanceOf[TestConsumer].dontPeek = false
+//    TestMessagingProvider.paused -= "completed10"
+
+    //println("processing ack2 again")
+    //we should then get a completion for lb1 (where initial publish occurred)
+    eventually(timeout(5000 millis), interval(50 millis)) {
+//      activeAckConsumer1.asInstanceOf[TestConsumer].offset shouldBe 1
+      TestMessagingProvider.occupancy("completed9") shouldBe 1
+    }
+
+    //renable lb1 ack processing
+    lb1.activeAckConsumer.asInstanceOf[TestConsumer].dontPeek = false
+//    TestMessagingProvider.paused -= "completed9"
+
+    //monitor.expectMsg(Underflow)
+    //println(s"waiting for ${futures.size} futures")
+
+    completions.foreach(producer.send(s"completed9", _))
+
+    Await.ready(Future.sequence(futures), 5.seconds)
+
+//    lb1.activeAckConsumer.close()
+//    lb2.activeAckConsumer.close()
+//    lb1.overflowConsumer.close()
+//    lb2.overflowConsumer.close()
+    lb1.lbActor ! PoisonPill
+    lb2.lbActor ! PoisonPill
+
+    invs.foreach(i => {
+      val msgs = i._3.peek(100.milliseconds)
+      //println(s"found ${msgs.size} in invoker${i._1}")
+    })
+    //end with nothing queued
+    eventually(timeout(5000 millis), interval(50 millis)) {
+      TestMessagingProvider.occupancy("invoker0") shouldBe 0
+      TestMessagingProvider.occupancy("invoker1") shouldBe 0
+      TestMessagingProvider.occupancy("invoker2") shouldBe 0
+      TestMessagingProvider.occupancy("overflow") shouldBe 0
+    }
+  }
+  it should "rely on replicated data to decide" in {
+
+    val monitor = TestProbe()
+    val messagingProvider = SpiLoader.get[MessagingProvider]
+    //configure lb1 to NOT process overflow messages
+    val instance1 = InstanceId(9)
+    val instance2 = InstanceId(10)
+    val pingConsumer = createPingConsumer(messagingProvider, instance1)
+    val maxPingsPerPoll = 54
+
+    val maxActiveAcksPerPoll = 54
+    val maxOverflowPerPoll = 5
+    val overflowCapacity = Some(5)
+
+    val activeAckConsumer1 = createAckConsumer(messagingProvider, instance1)
+    val activeAckConsumer2 = createAckConsumer(messagingProvider, instance2)
+    val overflowConsumer1 = createOverflowConsumer(messagingProvider, 
instance1)
+    val overflowConsumer2 = createOverflowConsumer(messagingProvider, 
instance2)
+
+    val lbData1 = new DistributedLoadBalancerData(instance1, Some(testActor))
+    val lbData2 = new DistributedLoadBalancerData(instance2, Some(testActor))
+
+    //start with nothing queued
+    TestMessagingProvider.occupancy("invoker0") shouldBe 0
+    TestMessagingProvider.occupancy("invoker1") shouldBe 0
+    TestMessagingProvider.occupancy("invoker2") shouldBe 0
+    TestMessagingProvider.occupancy("overflow") shouldBe 0
+
+    //disable reading from overflow on lb1
+    overflowConsumer1.asInstanceOf[TestConsumer].dontPeek = true
+
+    val lb1 =
+      new LoadBalancerActorService(
+        whiskConfig,
+        instance1,
+//        entityStore,
+        poolActor,
+//        pingConsumer,
+//        activeAckConsumer1,
+//        overflowConsumer1,
+        lbData1)
+    val lb2 = new LoadBalancerActorService(
+      whiskConfig,
+      instance2,
+//      entityStore,
+      poolActor,
+//      pingConsumer,
+//      activeAckConsumer2,
+//      overflowConsumer2,
+      lbData2)
+//    lb1.updateInvokers(invs.map(i => (i._1, i._2)))
+//    lb2.updateInvokers(invs.map(i => (i._1, i._2)))
+
+    val exec = CodeExecAsString(RuntimeManifest("actionKind", 
ImageName("testImage")), "testCode", None)
+    val action = ExecutableWhiskAction(EntityPath("actionSpace"), 
EntityName("actionName"), exec)
+
+    //verify underflow
+    //    lb.overflowState.get() shouldBe false
+    //TODO: use default (16) for threshold
+    val futures = ListBuffer[Future[Any]]()
+
+    //there is 1 invoker reserved for blackbox currently:
+    val numInvokers = invs.size - 1
+    //send 1 activation per invoker
+    val activations = ListBuffer[ActivationId]();
+    val completions = ListBuffer[CompletionMessage]();
+    (1 to numInvokers - 1).foreach(i => {
+      val id = idGen.make()
+      activations += id
+      futures += lb1.publish(action, createActivation(action, id, 
TransactionId(i)))(TransactionId(i))
+      completions += CompletionMessage(TransactionId.testing, 
Right(activation(id)), invs(i - 1)._1)
+    })
+
+    //DO NOT WAIT FOR REPLICATION HERE
+
+    //wait for queueing - the action hash will cause scheduling to start with 
invoker1
+    eventually(timeout(5000 millis), interval(50 millis)) {
+      TestMessagingProvider.occupancy("invoker0") shouldBe 0
+      TestMessagingProvider.occupancy("invoker1") shouldBe 1
+      TestMessagingProvider.occupancy("invoker2") shouldBe 1
+    }
+    (1 to 1).foreach(i => {
+      val id = idGen.make()
+      activations += id
+      futures += lb2.publish(action, createActivation(action, id, 
TransactionId(i)))(TransactionId(i))
+      completions += CompletionMessage(TransactionId.testing, 
Right(activation(id)), invs(i - 1)._1)
+    })
+
+    //before replication, the scheduling will still start at invoker1
+    eventually(timeout(5000 millis), interval(50 millis)) {
+      TestMessagingProvider.occupancy("invoker0") shouldBe 0
+      TestMessagingProvider.occupancy("invoker1") shouldBe 2
+      TestMessagingProvider.occupancy("invoker2") shouldBe 1
+    }
+    //wait for replication
+    //expectMsgClass(classOf[Updated])
+
+    eventually(timeout(5000 millis), interval(50 millis)) {
+      //without shared replicator:
+      expectMsg(Updated("Namespaces", Map(authKey.uuid.toString -> Map(9 -> 2, 
10 -> 1))))
+      expectMsg(
+        Updated(
+          "Invokers",
+          Map("InstanceId(1)" -> Map(9 -> 1, 10 -> 1), "InstanceId(2)" -> 
Map(9 -> 1), "InstanceId(0)" -> Map(9 -> 0))))
+      //with shared replicator:
+
+    }
+
+    (1 to 1).foreach(i => {
+      val id = idGen.make()
+      activations += id
+      futures += lb2.publish(action, createActivation(action, id, 
TransactionId(i)))(TransactionId(i))
+      completions += CompletionMessage(TransactionId.testing, 
Right(activation(id)), invs(i - 1)._1)
+    })
+
+    //after replication, the scheduling will still start at invoker0
+    eventually(timeout(5000 millis), interval(50 millis)) {
+      TestMessagingProvider.occupancy("invoker0") shouldBe 1
+      TestMessagingProvider.occupancy("invoker1") shouldBe 2
+      TestMessagingProvider.occupancy("invoker2") shouldBe 1
+    }
+
+    lb1.lbActor ! PoisonPill
+    lb2.lbActor ! PoisonPill
+
+  }
+
+  val idGen = new ActivationId.ActivationIdGenerator {}
+
+  val activations = mutable.ListBuffer[ActivationMessage]()
+
+  def createActivation(action: ExecutableWhiskAction, id: ActivationId, 
transid: TransactionId) = {
+    val a = ActivationMessage(
+      transid = transid,
+      action = action.fullyQualifiedName(true),
+      revision = DocRevision.empty,
+      user = Identity(Subject("unhealthyInvokerCheck"), 
EntityName("unhealthyInvokerCheck"), authKey, Set[Privilege]()),
+      activationId = id,
+      activationNamespace = EntityPath("guest"),
+      rootControllerIndex = InstanceId(0),
+      blocking = false,
+      content = None)
+    activations += a
+    a
+  }
+
+  def createPingConsumer(messagingProvider: MessagingProvider, instanceId: 
InstanceId) =
+    messagingProvider.getConsumer(
+      whiskConfig,
+      s"health${instanceId.toInt}",
+      "health",
+      maxPeek = 128,
+      maxPollInterval = 200.millis)
+  def createAckConsumer(messagingProvider: MessagingProvider, instanceId: 
InstanceId) =
+    messagingProvider.getConsumer(
+      whiskConfig,
+      "completions",
+      s"completed${instanceId.toInt}",
+      maxPeek = 128,
+      maxPollInterval = 200.millis)
+  def createOverflowConsumer(messagingProvider: MessagingProvider, instanceId: 
InstanceId) =
+    messagingProvider.getConsumer(whiskConfig, "overflow", s"overflow", 
maxPeek = 1, maxPollInterval = 200.millis)
+}
diff --git 
a/tests/src/test/scala/whisk/core/loadBalancer/test/SharedDataServiceTests.scala
 
b/tests/src/test/scala/whisk/core/loadBalancer/test/SharedDataServiceTests.scala
index 3961e539ca..0774fb3ef9 100644
--- 
a/tests/src/test/scala/whisk/core/loadBalancer/test/SharedDataServiceTests.scala
+++ 
b/tests/src/test/scala/whisk/core/loadBalancer/test/SharedDataServiceTests.scala
@@ -25,8 +25,8 @@ import com.typesafe.config.ConfigFactory
 import org.scalatest._
 import whisk.core.loadBalancer._
 import org.scalatest.FlatSpecLike
-
 import scala.concurrent.duration._
+import whisk.core.entity.InstanceId
 
 // Define your test specific configuration here
 
@@ -61,7 +61,9 @@ class SharedDataServiceTests()
     .withFallback(ConfigFactory.load())
 
   val s = ActorSystem("controller-actor-system", config)
-  val sharedDataService = s.actorOf(SharedDataService.props("Candidates"), 
name = "busyMan")
+  val storageName = "Candidates"
+  val instance = InstanceId(123)
+  val sharedDataService = s.actorOf(SharedDataService.props(storageName, 
testActor), name = "busyMan")
   implicit val timeout = Timeout(5.seconds)
 
   it should "retrieve an empty map after initialization" in {
@@ -70,22 +72,37 @@ class SharedDataServiceTests()
     expectMsg(msg)
   }
   it should "increase the counter" in {
-    sharedDataService ! (IncreaseCounter("Donald", 1))
+    sharedDataService ! (IncreaseCounter("Donald", instance, 1))
+    val msg = Map("Donald" -> Map(instance.toInt -> 1))
+    expectMsg(Updated(storageName, msg))
     sharedDataService ! GetMap
-    val msg = Map("Donald" -> 1)
     expectMsg(msg)
   }
   it should "decrease the counter" in {
-    sharedDataService ! (IncreaseCounter("Donald", 2))
-    sharedDataService ! (DecreaseCounter("Donald", 2))
+    //verify starting at 1
     sharedDataService ! GetMap
-    val msg = Map("Donald" -> 1)
+    val msg = Map("Donald" -> Map(instance.toInt -> 1))
     expectMsg(msg)
+
+    //increase and verify change
+    sharedDataService ! (IncreaseCounter("Donald", instance, 2))
+    val msg2 = Map("Donald" -> Map(instance.toInt -> 3))
+    expectMsg(Updated(storageName, msg2))
+    sharedDataService ! GetMap
+    expectMsg(msg2)
+
+    //decrease and verify change
+    sharedDataService ! (DecreaseCounter("Donald", instance, 2))
+    val msg3 = Map("Donald" -> Map(instance.toInt -> 1))
+    expectMsg(Updated(storageName, msg3))
+    sharedDataService ! GetMap
+    expectMsg(msg3)
   }
   it should "receive the map with all counters" in {
-    sharedDataService ! (IncreaseCounter("Hilary", 1))
+    sharedDataService ! (IncreaseCounter("Hilary", instance, 1))
+    val msg = Map("Hilary" -> Map(instance.toInt -> 1), "Donald" -> 
Map(instance.toInt -> 1))
+    expectMsg(Updated(storageName, msg))
     sharedDataService ! GetMap
-    val msg = Map("Hilary" -> 1, "Donald" -> 1)
     expectMsg(msg)
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to