This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 8930735  Send active ack on failed activations (#2384)
8930735 is described below

commit 8930735e16eef37b22df57e3430bca6d75e0a6c7
Author: rodric rabbah <rod...@gmail.com>
AuthorDate: Fri Jun 23 11:57:48 2017 -0400

    Send active ack on failed activations (#2384)
    
    * Sends active ack for failed activations due to container startup failures.
    
    Add recovery in loadbalancer for when invoker dies and does not send active 
ack.
    Consolidate all blocking request handling into a single stateful actor.
    Modify completion message to response with either id/result for when the 
active ack might exceed bus limit.
    Retry failure to send active ack from invoker.
---
 .../src/main/scala/whisk/common/Scheduler.scala    |  56 ++++-
 .../main/scala/whisk/core/connector/Message.scala  |  12 +-
 .../main/scala/whisk/core/entity/InstanceId.scala  |  13 +-
 .../whisk/utils/ExecutionContextFactory.scala      |  12 +-
 .../main/scala/whisk/core/controller/Actions.scala |  16 +-
 .../scala/whisk/core/controller/WebActions.scala   |  17 +-
 .../controller/actions/PostActionActivation.scala  |  26 ++-
 .../core/controller/actions/PrimitiveActions.scala | 229 +++++++++++++--------
 .../core/controller/actions/SequenceActions.scala  |  77 +++----
 .../core/loadBalancer/LoadBalancerService.scala    |  73 ++++---
 .../main/scala/whisk/core/invoker/Invoker.scala    |  49 +++--
 .../scala/whisk/core/invoker/InvokerReactive.scala |  18 +-
 .../test/scala/whisk/common/SchedulerTests.scala   |  60 ++++++
 .../connector/tests/CompletionMessageTests.scala   |  84 ++++++++
 .../controller/test/ControllerTestCommon.scala     |   4 +-
 .../core/controller/test/WebActionsApiTests.scala  |  13 +-
 .../whisk/core/limits/ActionLimitsTests.scala      |  52 +++--
 17 files changed, 542 insertions(+), 269 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/common/Scheduler.scala 
b/common/scala/src/main/scala/whisk/common/Scheduler.scala
index 506760a..b51f75a 100644
--- a/common/scala/src/main/scala/whisk/common/Scheduler.scala
+++ b/common/scala/src/main/scala/whisk/common/Scheduler.scala
@@ -25,6 +25,7 @@ import scala.util.Try
 
 import akka.actor.Actor
 import akka.actor.ActorSystem
+import akka.actor.Cancellable
 import akka.actor.Props
 
 /**
@@ -32,7 +33,8 @@ import akka.actor.Props
  * even for asynchronous tasks.
  */
 object Scheduler {
-    private case object Work
+    case object WorkOnceNow
+    private case object ScheduledWork
 
     /**
      * Sets up an Actor to send itself a message to mimic schedulers behavior 
in a more controllable way.
@@ -41,24 +43,40 @@ object Scheduler {
      * @param alwaysWait always wait for the given amount of time or calculate 
elapsed time to wait
      * @param closure the closure to be executed
      */
-    private class Worker(interval: FiniteDuration, alwaysWait: Boolean, 
closure: () => Future[Any])(implicit logging: Logging) extends Actor {
+    private class Worker(
+        initialDelay: FiniteDuration,
+        interval: FiniteDuration,
+        alwaysWait: Boolean,
+        name: String,
+        closure: () => Future[Any])(
+            implicit logging: Logging,
+            transid: TransactionId) extends Actor {
         implicit val ec = context.dispatcher
 
-        override def preStart() = {
-            self ! Work
+        var lastSchedule: Option[Cancellable] = {
+            Some(context.system.scheduler.scheduleOnce(initialDelay, self, 
ScheduledWork))
+        }
+
+        override def postStop() = {
+            logging.info(this, s"$name shutdown")
+            lastSchedule.foreach(_.cancel())
         }
 
         def receive = {
-            case Work =>
+            case WorkOnceNow => Try(closure())
+
+            case ScheduledWork =>
                 val deadline = interval.fromNow
                 Try(closure()) match {
                     case Success(result) =>
                         result onComplete { _ =>
                             val timeToWait = if (alwaysWait) interval else 
deadline.timeLeft.max(Duration.Zero)
                             // context might be null here if a PoisonPill is 
sent while doing computations
-                            Option(context) foreach { 
_.system.scheduler.scheduleOnce(timeToWait, self, Work) }
+                            lastSchedule = 
Option(context).map(_.system.scheduler.scheduleOnce(timeToWait, self, 
ScheduledWork))
                         }
-                    case Failure(e) => logging.error(this, s"next iteration 
could not be scheduled because of ${e.getMessage}. Scheduler is halted")
+
+                    case Failure(e) =>
+                        logging.error(name, s"halted because ${e.getMessage}")
                 }
         }
     }
@@ -70,11 +88,19 @@ object Scheduler {
      * is immediately fired.
      *
      * @param interval the time to wait at most between two runs of the closure
+     * @param initialDelay optionally delay the first scheduled iteration by 
given duration
      * @param f the function to run
      */
-    def scheduleWaitAtMost(interval: FiniteDuration)(f: () => 
Future[Any])(implicit system: ActorSystem, logging: Logging) = {
+    def scheduleWaitAtMost(
+        interval: FiniteDuration,
+        initialDelay: FiniteDuration = Duration.Zero,
+        name: String = "Scheduler")(
+            f: () => Future[Any])(
+                implicit system: ActorSystem,
+                logging: Logging,
+                transid: TransactionId = TransactionId.unknown) = {
         require(interval > Duration.Zero)
-        system.actorOf(Props(new Worker(interval, false, f)))
+        system.actorOf(Props(new Worker(initialDelay, interval, false, name, 
f)))
     }
 
     /**
@@ -83,10 +109,18 @@ object Scheduler {
      * given interval.
      *
      * @param interval the time to wait between two runs of the closure
+     * @param initialDelay optionally delay the first scheduled iteration by 
given duration
      * @param f the function to run
      */
-    def scheduleWaitAtLeast(interval: FiniteDuration)(f: () => 
Future[Any])(implicit system: ActorSystem, logging: Logging) = {
+    def scheduleWaitAtLeast(
+        interval: FiniteDuration,
+        initialDelay: FiniteDuration = Duration.Zero,
+        name: String = "Scheduler")(
+            f: () => Future[Any])(
+                implicit system: ActorSystem,
+                logging: Logging,
+                transid: TransactionId = TransactionId.unknown) = {
         require(interval > Duration.Zero)
-        system.actorOf(Props(new Worker(interval, true, f)))
+        system.actorOf(Props(new Worker(initialDelay, interval, true, name, 
f)))
     }
 }
diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala 
b/common/scala/src/main/scala/whisk/core/connector/Message.scala
index c895690..3491c21 100644
--- a/common/scala/src/main/scala/whisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala
@@ -91,20 +91,22 @@ object ActivationMessage extends DefaultJsonProtocol {
  */
 case class CompletionMessage(
     override val transid: TransactionId,
-    response: WhiskActivation,
+    response: Either[ActivationId, WhiskActivation],
     invoker: String)
     extends Message {
 
-    override def serialize = CompletionMessage.serdes.write(this).compactPrint
+    override def serialize: String = {
+        CompletionMessage.serdes.write(this).compactPrint
+    }
 
     override def toString = {
-        s"${response.activationId}"
+        response.fold(l => l, r => r.activationId).asString
     }
 }
 
 object CompletionMessage extends DefaultJsonProtocol {
-    def parse(msg: String) = Try(serdes.read(msg.parseJson))
-    implicit val serdes = jsonFormat3(CompletionMessage.apply)
+    def parse(msg: String): Try[CompletionMessage] = 
Try(serdes.read(msg.parseJson))
+    private val serdes = jsonFormat3(CompletionMessage.apply)
 }
 
 case class PingMessage(name: String) extends Message {
diff --git a/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala 
b/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala
index f573b04..03f3271 100644
--- a/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala
@@ -1,11 +1,12 @@
 /*
- * Copyright 2015-2016 IBM Corporation
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
diff --git 
a/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala 
b/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala
index b002981..21971b0 100644
--- a/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala
+++ b/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala
@@ -50,18 +50,10 @@ object ExecutionContextFactory {
             implicit val ec = system.dispatcher
             firstCompletedOf(Seq(f, expire(timeout, 
system.scheduler)(Future.failed(msg))))
         }
-    }
 
-    /**
-     * Extends a promise with an scheduled call back. The call back may be 
used to complete the promise. The result of the
-     * call back is not interesting to this method.
-     * The idiom to use is: promise after(duration, 
promise.tryFailure(TimeoutException)`.
-     */
-    implicit class PromiseExtensions[T](p: Promise[T]) {
-        def after(timeout: FiniteDuration, next: => Unit)(implicit system: 
ActorSystem): Promise[T] = {
+        def withAlternativeAfterTimeout(timeout: FiniteDuration, alt: => 
Future[T])(implicit system: ActorSystem): Future[T] = {
             implicit val ec = system.dispatcher
-            expire(timeout, system.scheduler)(Future { next })
-            p
+            firstCompletedOf(Seq(f, expire(timeout, system.scheduler)(alt)))
         }
     }
 
diff --git a/core/controller/src/main/scala/whisk/core/controller/Actions.scala 
b/core/controller/src/main/scala/whisk/core/controller/Actions.scala
index e685bf4..6c1db7c 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Actions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Actions.scala
@@ -37,7 +37,6 @@ import spray.json.DefaultJsonProtocol._
 import spray.routing.RequestContext
 import whisk.common.TransactionId
 import whisk.core.WhiskConfig
-import whisk.core.controller.actions.BlockingInvokeTimeout
 import whisk.core.controller.actions.PostActionActivation
 import whisk.core.database.NoDocumentException
 import whisk.core.entitlement._
@@ -59,7 +58,10 @@ object WhiskActionsApi {
     /** Grace period after action timeout limit to poll for result. */
     protected[core] val blockingInvokeGrace = 5 seconds
 
-    /** Max duration to wait for a blocking activation. */
+    /**
+     * Max duration to wait for a blocking activation.
+     * This is the default timeout on a POST request.
+     */
     protected[core] val maxWaitForBlockingActivation = 60 seconds
 }
 
@@ -221,11 +223,12 @@ trait WhiskActionsApi
                         onComplete(entitleReferencedEntities(user, 
Privilege.ACTIVATE, Some(action.exec))) {
                             case Success(_) =>
                                 val actionWithMergedParams = 
env.map(action.inherit(_)) getOrElse action
-                                onComplete(invokeAction(user, 
actionWithMergedParams, payload, blocking, Some(waitOverride))) {
-                                    case Success((activationId, None)) =>
+                                val waitForResponse = if (blocking) 
Some(waitOverride) else None
+                                onComplete(invokeAction(user, 
actionWithMergedParams, payload, waitForResponse, cause = None)) {
+                                    case Success(Left(activationId)) =>
                                         // non-blocking invoke or blocking 
invoke which got queued instead
                                         complete(Accepted, 
activationId.toJsObject)
-                                    case Success((activationId, 
Some(activation))) =>
+                                    case Success(Right(activation)) =>
                                         val response = if (result) 
activation.resultAsJson else activation.toExtendedJson
 
                                         if (activation.response.isSuccess) {
@@ -243,9 +246,6 @@ trait WhiskActionsApi
                                         } else {
                                             complete(InternalServerError, 
response)
                                         }
-                                    case Failure(t: BlockingInvokeTimeout) =>
-                                        logging.info(this, s"[POST] action 
activation waiting period expired")
-                                        complete(Accepted, 
t.activationId.toJsObject)
                                     case Failure(t: RecordTooLargeException) =>
                                         logging.info(this, s"[POST] action 
payload was too large")
                                         terminate(RequestEntityTooLarge)
diff --git 
a/core/controller/src/main/scala/whisk/core/controller/WebActions.scala 
b/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
index 9d30d70..c05c4c0 100644
--- a/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
@@ -43,7 +43,6 @@ import spray.routing.RequestContext
 import spray.routing.Route
 import spray.http.HttpMethods.{ OPTIONS, GET, DELETE, POST, PUT, HEAD, PATCH }
 import whisk.common.TransactionId
-import whisk.core.controller.actions.BlockingInvokeTimeout
 import whisk.core.controller.actions.PostActionActivation
 import whisk.core.database._
 import whisk.core.entity._
@@ -336,6 +335,8 @@ trait WhiskWebActionsApi
     def routes(user: Identity)(implicit transid: TransactionId): Route = 
routes(Some(user))
     def routes()(implicit transid: TransactionId): Route = routes(None)
 
+    private val maxWaitForWebActionResult = 
Some(WhiskActionsApi.maxWaitForBlockingActivation)
+
     /**
      * Adds route to web based activations. Actions invoked this way are 
anonymous in that the
      * caller is not authenticated. The intended action must be named in the 
path as a fully qualified
@@ -540,8 +541,7 @@ trait WhiskWebActionsApi
             // they will be overwritten
             if (isRawHttpAction || 
context.overrides(webApiDirectives.reservedProperties ++ 
action.immutableParameters).isEmpty) {
                 val content = context.toActionArgument(onBehalfOf, 
isRawHttpAction)
-                val waitOverride = 
Some(WhiskActionsApi.maxWaitForBlockingActivation)
-                invokeAction(actionOwnerIdentity, action, 
Some(JsObject(content)), blocking = true, waitOverride)
+                invokeAction(actionOwnerIdentity, action, 
Some(JsObject(content)), maxWaitForWebActionResult, cause = None)
             } else {
                 Future.failed(RejectRequest(BadRequest, 
Messages.parametersNotAllowed))
             }
@@ -551,12 +551,12 @@ trait WhiskWebActionsApi
     }
 
     private def completeRequest(
-        queuedActivation: Future[(ActivationId, Option[WhiskActivation])],
+        queuedActivation: Future[Either[ActivationId, WhiskActivation]],
         projectResultField: => List[String],
         responseType: MediaExtension)(
             implicit transid: TransactionId) = {
         onComplete(queuedActivation) {
-            case Success((activationId, Some(activation))) =>
+            case Success(Right(activation)) =>
                 val result = activation.resultAsJson
 
                 if (activation.response.isSuccess || 
activation.response.isApplicationError) {
@@ -583,14 +583,9 @@ trait WhiskWebActionsApi
                     terminate(BadRequest, Messages.errorProcessingRequest)
                 }
 
-            case Success((activationId, None)) =>
+            case Success(Left(activationId)) =>
                 // blocking invoke which got queued instead
                 // this should not happen, instead it should be a blocking 
invoke timeout
-                logging.warn(this, "activation returned an id, expecting 
timeout error instead")
-                terminate(Accepted, Messages.responseNotReady)
-
-            case Failure(t: BlockingInvokeTimeout) =>
-                // blocking invoke which timed out waiting on response
                 logging.info(this, "activation waiting period expired")
                 terminate(Accepted, Messages.responseNotReady)
 
diff --git 
a/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala
 
b/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala
index eec3399..5e6be7d 100644
--- 
a/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala
+++ 
b/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala
@@ -18,14 +18,12 @@
 package whisk.core.controller.actions
 
 import scala.concurrent.Future
-import scala.concurrent.TimeoutException
 import scala.concurrent.duration.FiniteDuration
 
 import spray.http.StatusCodes.BadRequest
 import spray.json._
 import whisk.common.TransactionId
 import whisk.core.controller.RejectRequest
-import whisk.core.controller.WhiskActionsApi._
 import whisk.core.controller.WhiskServices
 import whisk.core.entity._
 import whisk.http.Messages
@@ -40,25 +38,25 @@ protected[core] trait PostActionActivation extends 
PrimitiveActions with Sequenc
      * @param user the user posting the activation
      * @param action the action to activate (parameters for packaged actions 
must already be merged)
      * @param payload the parameters to pass to the action
-     * @param blocking iff true, wait for the activation result
-     * @param waitOverride iff blocking, wait up up to the action limit or a 
given max duration
-     * @return a future that resolves with the (activation id, and some whisk 
activation if a blocking invoke)
+     * @param waitForResponse if not empty, wait up to specified duration for 
a response (this is used for blocking activations)
+     * @return a future that resolves with Left(activation id) when the 
request is queued, or Right(activation) for a blocking request
+     *         which completes in time iff waiting for an response
      */
-    protected[controller] def invokeAction(user: Identity, action: 
WhiskAction, payload: Option[JsObject], blocking: Boolean, waitOverride: 
Option[FiniteDuration] = None)(
-        implicit transid: TransactionId): Future[(ActivationId, 
Option[WhiskActivation])] = {
+    protected[controller] def invokeAction(
+        user: Identity,
+        action: WhiskAction,
+        payload: Option[JsObject],
+        waitForResponse: Option[FiniteDuration],
+        cause: Option[ActivationId])(
+            implicit transid: TransactionId): Future[Either[ActivationId, 
WhiskActivation]] = {
         action.exec match {
             // this is a topmost sequence
             case SequenceExec(components) =>
-                val futureSeqTuple = invokeSequence(user, action, payload, 
blocking, topmost = true, components, cause = None, 0)
-                futureSeqTuple map { case (activationId, wskActivation, _) => 
(activationId, wskActivation) }
+                invokeSequence(user, action, components, payload, 
waitForResponse, cause, topmost = true, 0).map(r => r._1)
             case supportedExec if !supportedExec.deprecated =>
-                val duration = action.limits.timeout.duration + 
blockingInvokeGrace
-                val timeout = waitOverride.getOrElse(duration)
-                invokeSingleAction(user, action, payload, timeout, blocking)
+                invokeSingleAction(user, action, payload, waitForResponse, 
cause)
             case deprecatedExec =>
                 Future.failed(RejectRequest(BadRequest, 
Messages.runtimeDeprecated(deprecatedExec)))
         }
     }
 }
-
-protected[controller] case class BlockingInvokeTimeout(activationId: 
ActivationId) extends TimeoutException
diff --git 
a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
 
b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
index 0911a75..0a9034b 100644
--- 
a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
+++ 
b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
@@ -20,21 +20,28 @@ package whisk.core.controller.actions
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.Promise
-import scala.concurrent.TimeoutException
 import scala.concurrent.duration._
+import scala.util.Failure
 
+import akka.actor.Actor
 import akka.actor.ActorSystem
+import akka.actor.Props
+
 import spray.json._
+
 import whisk.common.Logging
 import whisk.common.LoggingMarkers
+import whisk.common.Scheduler
 import whisk.common.TransactionId
 import whisk.core.connector.ActivationMessage
-import whisk.core.controller.WhiskActionsApi
 import whisk.core.controller.WhiskServices
 import whisk.core.database.NoDocumentException
 import whisk.core.entity._
 import whisk.core.entity.types.ActivationStore
 import whisk.core.entity.types.EntityStore
+import whisk.utils.ExecutionContextFactory.FutureExtensions
+import scala.collection.mutable.Buffer
+import akka.actor.Cancellable
 
 protected[actions] trait PrimitiveActions {
     /** The core collections require backend services to be injected in this 
trait. */
@@ -60,15 +67,11 @@ protected[actions] trait PrimitiveActions {
     /** Database service to get activations. */
     protected val activationStore: ActivationStore
 
-    /** Max duration for active ack. */
-    protected val activeAckTimeout = 
WhiskActionsApi.maxWaitForBlockingActivation
-
     /**
-     * Gets document from datastore to confirm a valid action activation then 
posts request to loadbalancer.
-     * If the loadbalancer accepts the requests with an activation id, then 
wait for the result of the activation
-     * if this is a blocking invoke, else return the activation id.
+     * Posts request to the loadbalancer. If the loadbalancer accepts the 
requests with an activation id,
+     * then wait for the result of the activation if necessary.
      *
-     * NOTE: This is a point-in-time type of statement:
+     * NOTE:
      * For activations of actions, cause is populated only for actions that 
were invoked as a result of a sequence activation.
      * For actions that are enclosed in a sequence and are activated as a 
result of the sequence activation, the cause
      * contains the activation id of the immediately enclosing sequence.
@@ -77,23 +80,25 @@ protected[actions] trait PrimitiveActions {
      * cause for c is the activation id of x
      * cause for s is not defined
      *
-     * @param subject the subject invoking the action
-     * @param docid the action document id
+     * @param user the identity invoking the action
+     * @param action the action to invoke
      * @param payload the dynamic arguments for the activation
-     * @param timeout the timeout used for polling for result if the invoke is 
blocking
-     * @param blocking true iff this is a blocking invoke
+     * @param waitForResponse if not empty, wait upto specified duration for a 
response (this is used for blocking activations)
      * @param cause the activation id that is responsible for this 
invoke/activation
      * @param transid a transaction id for logging
-     * @return a promise that completes with (ActivationId, 
Some(WhiskActivation)) if blocking else (ActivationId, None)
+     * @return a promise that completes with one of the following successful 
cases:
+     *            Right(WhiskActivation) if waiting for a response and 
response is ready within allowed duration,
+     *            Left(ActivationId) if not waiting for a response, or allowed 
duration has elapsed without a result ready
+     *         or these custom failures:
+     *            RequestEntityTooLarge if the message is too large to to post 
to the message bus
      */
     protected[actions] def invokeSingleAction(
         user: Identity,
         action: WhiskAction,
         payload: Option[JsObject],
-        timeout: FiniteDuration,
-        blocking: Boolean,
-        cause: Option[ActivationId] = None)(
-            implicit transid: TransactionId): Future[(ActivationId, 
Option[WhiskActivation])] = {
+        waitForResponse: Option[FiniteDuration],
+        cause: Option[ActivationId])(
+            implicit transid: TransactionId): Future[Either[ActivationId, 
WhiskActivation]] = {
         require(action.exec.kind != Exec.SEQUENCE, "this method requires a 
primitive action")
 
         // merge package parameters with action (action parameters supersede), 
then merge in payload
@@ -109,95 +114,137 @@ protected[actions] trait PrimitiveActions {
             args,
             cause = cause)
 
-        val startActivation = transid.started(this, if (blocking) 
LoggingMarkers.CONTROLLER_ACTIVATION_BLOCKING else 
LoggingMarkers.CONTROLLER_ACTIVATION)
-        val startLoadbalancer = transid.started(this, 
LoggingMarkers.CONTROLLER_LOADBALANCER, s"[POST] action activation id: 
${message.activationId}")
-        val postedFuture = loadBalancer.publish(action, message, 
activeAckTimeout)
-        postedFuture flatMap { activationResponse =>
+        val startActivation = transid.started(this, waitForResponse.map(_ => 
LoggingMarkers.CONTROLLER_ACTIVATION_BLOCKING).getOrElse(LoggingMarkers.CONTROLLER_ACTIVATION))
+        val startLoadbalancer = transid.started(this, 
LoggingMarkers.CONTROLLER_LOADBALANCER, s"action activation id: 
${message.activationId}")
+        val postedFuture = loadBalancer.publish(action, message)
+
+        postedFuture.flatMap { activeAckResponse =>
+            // successfully posted activation request to the message bus
             transid.finished(this, startLoadbalancer)
-            if (blocking) {
-                waitForActivationResponse(user, message.activationId, timeout, 
activationResponse) map {
-                    whiskActivation => (whiskActivation.activationId, 
Some(whiskActivation))
-                } andThen {
-                    case _ => transid.finished(this, startActivation)
-                }
-            } else {
+
+            // is caller waiting for the result of the activation?
+            waitForResponse.map { timeout =>
+                // yes, then wait for the activation response from the message 
bus
+                // (known as the active response or active ack)
+                waitForActivationResponse(user, message.activationId, timeout, 
activeAckResponse)
+                    .andThen { case _ => transid.finished(this, 
startActivation) }
+            }.getOrElse {
+                // no, return the activation id
                 transid.finished(this, startActivation)
-                Future.successful { (message.activationId, None) }
+                Future.successful(Left(message.activationId))
             }
         }
     }
 
     /**
-     * This is a fast path used for blocking calls in which we do not need the 
full WhiskActivation record from the DB.
-     * Polls for the activation response from an underlying data structure 
populated from Kafka active acknowledgements.
-     * If this mechanism fails to produce an answer quickly, the future will 
switch to polling the database for the response
-     * record.
+     * Waits for a response from the message bus (e.g., Kafka) containing the 
result of the activation. This is the fast path
+     * used for blocking calls where only the result of the activation is 
needed. This path is called active acknowledgement
+     * or active ack.
+     *
+     * While waiting for the active ack, periodically poll the datastore in 
case there is a failure in the fast path delivery
+     * which could happen if the connection from an invoker to the message bus 
is disrupted, or if the publishing of the response
+     * fails because the message is too large.
      */
-    private def waitForActivationResponse(user: Identity, activationId: 
ActivationId, totalWaitTime: FiniteDuration, activationResponse: 
Future[WhiskActivation])(implicit transid: TransactionId) = {
-        // this is the promise which active ack or db polling will try to 
complete in one of four ways:
-        // 1. active ack response
+    private def waitForActivationResponse(
+        user: Identity,
+        activationId: ActivationId,
+        totalWaitTime: FiniteDuration,
+        activeAckResponse: Future[Either[ActivationId, WhiskActivation]])(
+            implicit transid: TransactionId): Future[Either[ActivationId, 
WhiskActivation]] = {
+        // this is the promise which active ack or db polling will try to 
complete via:
+        // 1. active ack response, or
         // 2. failing active ack (due to active ack timeout), fall over to db 
polling
         // 3. timeout on db polling => converts activation to non-blocking 
(returns activation id only)
-        // 4. internal error
-        val promise = Promise[WhiskActivation]
-        val docid = DocId(WhiskEntity.qualifiedName(user.namespace.toPath, 
activationId))
-
-        logging.info(this, s"[POST] action activation will block on result up 
to $totalWaitTime")
-
-        // the active ack will timeout after specified duration, causing the 
db polling to kick in
-        activationResponse map {
-            activation => promise.trySuccess(activation)
-        } onFailure {
-            case t: TimeoutException =>
-                logging.info(this, s"[POST] switching to poll db, active ack 
expired")
-                pollDbForResult(docid, activationId, promise)
-            case t: Throwable =>
-                logging.info(this, s"[POST] switching to poll db, active ack 
exception: ${t.getMessage}")
-                pollDbForResult(docid, activationId, promise)
+        // 4. internal error message
+        val promise = Promise[Either[ActivationId, WhiskActivation]] // this 
is strictly completed by the finishing actor
+        val finisher = actorSystem.actorOf(Props(new ActivationFinisher(user, 
activationId, promise)))
+
+        logging.info(this, s"action activation will block for result upto 
$totalWaitTime")
+
+        activeAckResponse map {
+            case result @ Right(_) =>
+                // activation complete, result is available
+                finisher ! Finish(result)
+
+            case _ =>
+                // active ack received but it does not carry the response,
+                // no result available except by polling the db
+                logging.warn(this, "pre-emptively polling db because active 
ack is missing result")
+                finisher ! Scheduler.WorkOnceNow
         }
 
-        // install a timeout handler; this is the handler for "the action took 
longer than its Limit"
-        // note the use of WeakReferences; this is to avoid the handler's 
closure holding on to the
-        // WhiskActivation, which in turn holds on to the full JsObject of the 
response
-        val promiseRef = new java.lang.ref.WeakReference(promise)
-        actorSystem.scheduler.scheduleOnce(totalWaitTime) {
-            val p = promiseRef.get
-            if (p != null) {
-                p.tryFailure(new BlockingInvokeTimeout(activationId))
+        // return the promise which is either fulfilled by active ack, polling 
from the database,
+        // or the timeout alternative when the allowed duration expires (i.e., 
the action took
+        // longer than the permitted, per totalWaitTime).
+        promise.future.withAlternativeAfterTimeout(totalWaitTime, {
+            Future.successful(Left(activationId)).andThen {
+                // result no longer interesting; terminate the finisher/shut 
down db polling if necessary
+                case _ => actorSystem.stop(finisher)
             }
-        }
-
-        // return the future. note that pollDbForResult's isCompleted check 
will protect against unnecessary db activity
-        // that may overlap with a totalWaitTime timeout (because the promise 
will have already by tryFailure'd)
-        promise.future
+        })
     }
 
-    /**
-     * Polls for activation record. It is assumed that an activation record is 
created atomically and never updated.
-     * Fetch the activation record by its id. If it exists, complete the 
promise. Otherwise recursively poll until
-     * either there is an error in the get, or the promise has completed 
because it timed out. The promise MUST
-     * complete in the caller to terminate the polling.
-     */
-    private def pollDbForResult(
-        docid: DocId,
+    /** Periodically polls the db to cover missing active acks. */
+    val datastorePollPeriodForActivation = 15.seconds
+    val datastorePreemptivePolling = Seq(1.second, 3.seconds, 5.seconds, 
7.seconds)
+
+    protected case class Finish(activation: Right[ActivationId, 
WhiskActivation])
+
+    protected class ActivationFinisher(
+        user: Identity,
         activationId: ActivationId,
-        promise: Promise[WhiskActivation])(
-            implicit transid: TransactionId): Unit = {
-        // check if promise already completed due to timeout expiration (abort 
polling if so)
-        if (!promise.isCompleted) {
-            WhiskActivation.get(activationStore, docid) map {
-                activation => promise.trySuccess(activation.withoutLogs) // 
Logs always not provided on blocking call
-            } onFailure {
-                case e: NoDocumentException =>
-                    Thread.sleep(500)
-                    logging.debug(this, s"[POST] action activation not yet 
timed out, will poll for result")
-                    pollDbForResult(docid, activationId, promise)
-                case t: Throwable =>
-                    logging.error(this, s"[POST] action activation failed 
while waiting on result: ${t.getMessage}")
-                    promise.tryFailure(t)
-            }
-        } else {
-            logging.info(this, s"[POST] action activation timed out, 
terminated polling for result")
+        promise: Promise[Either[ActivationId, WhiskActivation]])(
+            implicit transid: TransactionId) extends Actor {
+
+        // when the future completes, self-destruct
+        promise.future.andThen { case _ => shutdown() }
+
+        val docid = DocId(WhiskEntity.qualifiedName(user.namespace.toPath, 
activationId))
+        val preemptiveMsgs: Buffer[Cancellable] = Buffer.empty
+
+        val poller = {
+            Scheduler.scheduleWaitAtMost(
+                datastorePollPeriodForActivation,
+                initialDelay = datastorePollPeriodForActivation,
+                name = "dbpoll")(() => {
+                    if (!promise.isCompleted) {
+                        WhiskActivation.get(activationStore, docid) map {
+                            // complete the future, which in turn will poison 
pill this scheduler
+                            activation =>
+                                
promise.trySuccess(Right(activation.withoutLogs)) // Logs always not provided 
on blocking call
+                        } andThen {
+                            case Failure(e: NoDocumentException) => // do 
nothing, scheduler will reschedule another poll
+                            case Failure(t: Throwable) => // something went 
wrong, abort
+                                logging.error(this, s"failed while waiting on 
result: ${t.getMessage}")
+                                promise.tryFailure(t) // complete the future, 
which in turn will poison pill this scheduler
+                        }
+                    } else Future.successful({}) // the scheduler will be 
halted because the promise is now resolved
+                })
+        }
+
+        def receive = {
+            case Finish(activation) =>
+                promise.trySuccess(activation)
+
+            case msg @ Scheduler.WorkOnceNow =>
+                // try up to three times when pre-emptying the schedule
+                datastorePreemptivePolling.foreach {
+                    s => preemptiveMsgs += 
context.system.scheduler.scheduleOnce(s, poller, msg)
+                }
+
+            case msg =>
+                poller ! msg
+        }
+
+        def shutdown(): Unit = {
+            context.stop(poller)
+            context.stop(self)
+        }
+
+        override def postStop() = {
+            logging.info(this, "finisher shutdown")
+            preemptiveMsgs.foreach(_.cancel())
+            context.stop(poller)
         }
     }
 }
diff --git 
a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
 
b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
index 042bc08..4ec48d8 100644
--- 
a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
+++ 
b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
@@ -30,10 +30,10 @@ import scala.util.Failure
 import scala.util.Success
 
 import akka.actor.ActorSystem
+
 import spray.json._
 import whisk.common.Logging
 import whisk.common.TransactionId
-import whisk.core.controller.WhiskActionsApi._
 import whisk.core.controller.WhiskServices
 import whisk.core.entity._
 import whisk.core.entity.size.SizeInt
@@ -60,14 +60,13 @@ protected[actions] trait SequenceActions {
     protected val activationStore: ActivationStore
 
     /** A method that knows how to invoke a single primitive action. */
-    protected[actions] def invokeSingleAction(
+    protected[actions] def invokeAction(
         user: Identity,
         action: WhiskAction,
         payload: Option[JsObject],
-        timeout: FiniteDuration,
-        blocking: Boolean,
-        cause: Option[ActivationId] = None)(
-            implicit transid: TransactionId): Future[(ActivationId, 
Option[WhiskActivation])]
+        waitForResponse: Option[FiniteDuration],
+        cause: Option[ActivationId])(
+            implicit transid: TransactionId): Future[Either[ActivationId, 
WhiskActivation]]
 
     /**
      * Executes a sequence by invoking in a blocking fashion each of its 
components.
@@ -86,13 +85,13 @@ protected[actions] trait SequenceActions {
     protected[actions] def invokeSequence(
         user: Identity,
         action: WhiskAction,
-        payload: Option[JsObject],
-        blocking: Boolean,
-        topmost: Boolean,
         components: Vector[FullyQualifiedEntityName],
+        payload: Option[JsObject],
+        waitForOutermostResponse: Option[FiniteDuration],
         cause: Option[ActivationId],
+        topmost: Boolean,
         atomicActionsCount: Int)(
-            implicit transid: TransactionId): Future[(ActivationId, 
Option[WhiskActivation], Int)] = {
+            implicit transid: TransactionId): Future[(Either[ActivationId, 
WhiskActivation], Int)] = {
         require(action.exec.kind == Exec.SEQUENCE, "this method requires an 
action sequence")
 
         // create new activation id that corresponds to the sequence
@@ -100,7 +99,10 @@ protected[actions] trait SequenceActions {
         logging.info(this, s"invoking sequence $action topmost $topmost 
activationid '$seqActivationId'")
 
         val start = Instant.now(Clock.systemUTC())
-        val futureSeqResult = {
+        val futureSeqResult: Future[(Either[ActivationId, WhiskActivation], 
Int)] = {
+            // even though the result of completeSequenceActivation is 
Right[WhiskActivation],
+            // use a more general type for futureSeqResult in case a blocking 
invoke takes
+            // longer than expected and we must return Left[ActivationId] 
instead
             completeSequenceActivation(
                 seqActivationId,
                 // the cause for the component activations is the current 
sequence
@@ -109,17 +111,16 @@ protected[actions] trait SequenceActions {
         }
 
         if (topmost) { // need to deal with blocking and closing connection
-            if (blocking) {
+            waitForOutermostResponse.map { timeout =>
                 logging.info(this, s"invoke sequence blocking topmost!")
-                val timeout = maxWaitForBlockingActivation + 
blockingInvokeGrace
-                // if the future fails with a timeout, the failure is dealt 
with at the caller level
-                futureSeqResult.withTimeout(timeout, new 
BlockingInvokeTimeout(seqActivationId))
-            } else {
+                futureSeqResult.withAlternativeAfterTimeout(timeout, 
Future.successful(Left(seqActivationId), atomicActionsCount))
+            }.getOrElse {
                 // non-blocking sequence execution, return activation id
-                Future.successful((seqActivationId, None, 0))
+                Future.successful(Left(seqActivationId), 0)
             }
         } else {
             // not topmost, no need to worry about terminating incoming request
+            // and this is a blocking activation therefore by definition
             // Note: the future for the sequence result recovers from all 
throwable failures
             futureSeqResult
         }
@@ -136,20 +137,20 @@ protected[actions] trait SequenceActions {
         topmost: Boolean,
         start: Instant,
         cause: Option[ActivationId])(
-            implicit transid: TransactionId): Future[(ActivationId, 
Some[WhiskActivation], Int)] = {
+            implicit transid: TransactionId): Future[(Right[ActivationId, 
WhiskActivation], Int)] = {
         // not topmost, no need to worry about terminating incoming request
         // Note: the future for the sequence result recovers from all 
throwable failures
         futureSeqResult.map { accounting =>
             // sequence terminated, the result of the sequence is the result 
of the last completed activation
             val end = Instant.now(Clock.systemUTC())
             val seqActivation = makeSequenceActivation(user, action, 
seqActivationId, accounting, topmost, cause, start, end)
-            (seqActivationId, Some(seqActivation), accounting.atomicActionCnt)
+            (Right(seqActivation), accounting.atomicActionCnt)
         }.andThen {
-            case Success((_, Some(seqActivation), _)) => 
storeSequenceActivation(seqActivation)
-            case Failure(t) =>
-                // This should never happen; in this case, there is no 
activation record created or stored:
-                // should there be?
-                logging.error(this, s"Sequence activation failed: 
${t.getMessage}")
+            case Success((Right(seqActivation), _)) => 
storeSequenceActivation(seqActivation)
+
+            // This should never happen; in this case, there is no activation 
record created or stored:
+            // should there be?
+            case Failure(t)                         => logging.error(this, 
s"Sequence activation failed: ${t.getMessage}")
         }
     }
 
@@ -312,27 +313,27 @@ protected[actions] trait SequenceActions {
             val futureWhiskActivationTuple = action.exec match {
                 case SequenceExec(components) =>
                     logging.info(this, s"sequence invoking an enclosed 
sequence $action")
-                    // call invokeSequence to invoke the inner sequence
-                    invokeSequence(user, action, inputPayload, blocking = 
true, topmost = false, components, cause, accounting.atomicActionCnt)
+                    // call invokeSequence to invoke the inner sequence; this 
is a blocking activation by definition
+                    invokeSequence(user, action, components, inputPayload, 
None, cause, topmost = false, accounting.atomicActionCnt)
                 case _ =>
                     // this is an invoke for an atomic action
                     logging.info(this, s"sequence invoking an enclosed atomic 
action $action")
-                    val timeout = action.limits.timeout.duration + 
blockingInvokeGrace
-                    invokeSingleAction(user, action, inputPayload, timeout, 
blocking = true, cause) map {
-                        case (activationId, wskActivation) => (activationId, 
wskActivation, accounting.atomicActionCnt + 1)
+                    val timeout = action.limits.timeout.duration + 1.minute
+                    invokeAction(user, action, inputPayload, waitForResponse = 
Some(timeout), cause) map {
+                        case res => (res, accounting.atomicActionCnt + 1)
                     }
             }
 
             futureWhiskActivationTuple.map {
-                case (activationId, wskActivation, atomicActionCountSoFar) =>
-                    wskActivation.map {
-                        activation => accounting.maybe(activation, 
atomicActionCountSoFar, actionSequenceLimit)
-                    }.getOrElse {
-                        // the wskActivation is None only if the result could 
not be retrieved in time either from active ack or from db
-                        logging.error(this, s"component activation timedout 
for $activationId")
-                        val activationResponse = 
ActivationResponse.whiskError(sequenceRetrieveActivationTimeout(activationId))
-                        accounting.fail(activationResponse, Some(activationId))
-                    }
+                case (Right(activation), atomicActionCountSoFar) =>
+                    accounting.maybe(activation, atomicActionCountSoFar, 
actionSequenceLimit)
+
+                case (Left(activationId), atomicActionCountSoFar) =>
+                    // the result could not be retrieved in time either from 
active ack or from db
+                    logging.error(this, s"component activation timedout for 
$activationId")
+                    val activationResponse = 
ActivationResponse.whiskError(sequenceRetrieveActivationTimeout(activationId))
+                    accounting.fail(activationResponse, Some(activationId))
+
             }.recover {
                 // check any failure here and generate an activation response 
to encapsulate
                 // the failure mode; consider this failure a whisk error
diff --git 
a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
 
b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index 636f157..5390a3a 100644
--- 
a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ 
b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -25,9 +25,7 @@ import scala.concurrent.Await
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.Promise
-import scala.concurrent.TimeoutException
 import scala.concurrent.duration.DurationInt
-import scala.concurrent.duration.FiniteDuration
 import scala.util.Failure
 import scala.util.Success
 
@@ -56,6 +54,8 @@ import scala.annotation.tailrec
 
 trait LoadBalancer {
 
+    val activeAckTimeoutGrace = 1.minute
+
     /**
      * Retrieves a per subject map of counts representing in-flight 
activations as seen by the load balancer
      *
@@ -64,16 +64,18 @@ trait LoadBalancer {
     def getActiveUserActivationCounts: Map[String, Int]
 
     /**
-     * Publishes activation message on internal bus for the invoker to pick up.
+     * Publishes activation message on internal bus for an invoker to pick up.
      *
+     * @param action the action to invoke
      * @param msg the activation message to publish on an invoker topic
-     * @param timeout the desired active ack timeout
      * @param transid the transaction id for the request
      * @return result a nested Future the outer indicating completion of 
publishing and
      *         the inner the completion of the action (i.e., the result)
-     *         if it is ready before timeout otherwise the future fails with 
ActiveAckTimeout
+     *         if it is ready before timeout (Right) otherwise the activation 
id (Left).
+     *         The future is guaranteed to complete within the declared action 
time limit
+     *         plus a grace period (see activeAckTimeoutGrace).
      */
-    def publish(action: WhiskAction, msg: ActivationMessage, timeout: 
FiniteDuration)(implicit transid: TransactionId): 
Future[Future[WhiskActivation]]
+    def publish(action: WhiskAction, msg: ActivationMessage)(implicit transid: 
TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]]
 
 }
 
@@ -88,11 +90,11 @@ class LoadBalancerService(config: WhiskConfig, instance: 
InstanceId, entityStore
 
     override def getActiveUserActivationCounts: Map[String, Int] = 
activationBySubject.toMap mapValues { _.size }
 
-    override def publish(action: WhiskAction, msg: ActivationMessage, timeout: 
FiniteDuration)(
-        implicit transid: TransactionId): Future[Future[WhiskActivation]] = {
+    override def publish(action: WhiskAction, msg: ActivationMessage)(
+        implicit transid: TransactionId): Future[Future[Either[ActivationId, 
WhiskActivation]]] = {
         chooseInvoker(action, msg).flatMap { invokerName =>
             val subject = msg.user.subject.asString
-            val entry = setupActivation(msg.activationId, subject, 
invokerName, timeout, transid)
+            val entry = setupActivation(action, msg.activationId, subject, 
invokerName, transid)
             sendActivationToInvoker(messageProducer, msg, invokerName).map { _ 
=>
                 entry.promise.future
             }
@@ -105,7 +107,7 @@ class LoadBalancerService(config: WhiskConfig, instance: 
InstanceId, entityStore
      * A map storing current activations based on ActivationId.
      * The promise value represents the obligation of writing the answer back.
      */
-    case class ActivationEntry(id: ActivationId, subject: String, invokerName: 
String, created: Instant, promise: Promise[WhiskActivation])
+    case class ActivationEntry(id: ActivationId, subject: String, invokerName: 
String, created: Instant, promise: Promise[Either[ActivationId, 
WhiskActivation]])
     type TrieSet[T] = TrieMap[T, Unit]
     private val activationById = new TrieMap[ActivationId, ActivationEntry]
     private val activationByInvoker = new TrieMap[String, 
TrieSet[ActivationEntry]]
@@ -117,42 +119,44 @@ class LoadBalancerService(config: WhiskConfig, instance: 
InstanceId, entityStore
      *
      * @param msg is the kafka message payload as Json
      */
-    private def processCompletion(msg: CompletionMessage) = {
-        implicit val tid = msg.transid
-        val aid = msg.response.activationId
-        logging.info(this, s"received active ack for '$aid'")
-        val response = msg.response
+    private def processCompletion(response: Either[ActivationId, 
WhiskActivation], tid: TransactionId, forced: Boolean): Unit = {
+        val aid = response.fold(l => l, r => r.activationId)
         activationById.remove(aid) match {
             case Some(entry @ ActivationEntry(_, subject, invokerIndex, _, p)) 
=>
+                logging.info(this, s"${if (!forced) "received" else "forced"} 
active ack for '$aid'")(tid)
                 activationByInvoker.getOrElseUpdate(invokerIndex, new 
TrieSet[ActivationEntry]).remove(entry)
                 activationBySubject.getOrElseUpdate(subject, new 
TrieSet[ActivationEntry]).remove(entry)
-                p.trySuccess(response)
-                logging.info(this, s"processed active response for '$aid'")
+                if (!forced) {
+                    p.trySuccess(response)
+                } else {
+                    p.tryFailure(new Throwable("no active ack received"))
+                }
+
             case None =>
-                logging.warn(this, s"processed active response for '$aid' 
which has no entry")
+                // the entry was already removed
+                logging.debug(this, s"received active ack for '$aid' which has 
no entry")(tid)
         }
     }
 
     /**
      * Creates an activation entry and insert into various maps.
      */
-    private def setupActivation(activationId: ActivationId, subject: String, 
invokerName: String, timeout: FiniteDuration, transid: TransactionId): 
ActivationEntry = {
+    private def setupActivation(action: WhiskAction, activationId: 
ActivationId, subject: String, invokerName: String, transid: TransactionId): 
ActivationEntry = {
         // either create a new promise or reuse a previous one for this 
activation if it exists
+        val timeout = action.limits.timeout.duration + activeAckTimeoutGrace
         val entry = activationById.getOrElseUpdate(activationId, {
-
-            // install a timeout handler; this is the handler for "the action 
took longer than ActiveAckTimeout"
-            // note the use of WeakReferences; this is to avoid the handler's 
closure holding on to the
-            // WhiskActivation, which in turn holds on to the full JsObject of 
the response
-            // NOTE: we do not remove the entry from the maps, as this is done 
only by processCompletion
-            val promiseRef = new 
java.lang.ref.WeakReference(Promise[WhiskActivation])
+            val promiseRef = new 
java.lang.ref.WeakReference(Promise[Either[ActivationId, WhiskActivation]])
+
+            // Install a timeout handler for the catastrophic case where an 
active ack is not received at all
+            // (because say an invoker is down completely, or the connection 
to the message bus is disrupted) or when
+            // the active ack is significantly delayed (possibly dues to long 
queues but the subject should not be penalized);
+            // in this case, if the activation handler is still registered, 
remove it and update the books.
+            // Note the use of WeakReferences; this is to avoid the handler's 
closure holding on to the
+            // WhiskActivation, which in turn holds on to the full JsObject of 
the response.
             actorSystem.scheduler.scheduleOnce(timeout) {
-                activationById.get(activationId).foreach { _ =>
-                    val p = promiseRef.get
-                    if (p != null && p.tryFailure(new 
ActiveAckTimeout(activationId))) {
-                        logging.info(this, "active response timed 
out")(transid)
-                    }
-                }
+                processCompletion(Left(activationId), transid, forced = true)
             }
+
             ActivationEntry(activationId, subject, invokerName, 
Instant.now(Clock.systemUTC()), promiseRef.get)
         })
 
@@ -238,8 +242,10 @@ class LoadBalancerService(config: WhiskConfig, instance: 
InstanceId, entityStore
         val raw = new String(bytes, StandardCharsets.UTF_8)
         CompletionMessage.parse(raw) match {
             case Success(m: CompletionMessage) =>
-                processCompletion(m)
-                invokerPool ! InvocationFinishedMessage(m.invoker, 
!m.response.response.isWhiskError)
+                processCompletion(m.response, m.transid, false)
+                // treat left as success (as it is the result a the message 
exceeding the bus limit)
+                val isSuccess = m.response.fold(l => true, r => 
!r.response.isWhiskError)
+                invokerPool ! InvocationFinishedMessage(m.invoker, isSuccess)
 
             case Failure(t) => logging.error(this, s"failed processing 
message: $raw with $t")
         }
@@ -384,5 +390,4 @@ object LoadBalancerService {
 
 }
 
-private case class ActiveAckTimeout(activationId: ActivationId) extends 
TimeoutException
 private case class LoadBalancerException(msg: String) extends Throwable(msg)
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala 
b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 813cc6b..a655023 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -47,6 +47,11 @@ import whisk.core.entity._
 import whisk.http.BasicHttpService
 import whisk.http.Messages
 import whisk.utils.ExecutionContextFactory
+import whisk.common.Scheduler
+import whisk.core.connector.PingMessage
+import scala.util.Try
+import whisk.core.connector.MessageProducer
+import org.apache.kafka.common.errors.RecordTooLargeException
 
 /**
  * A kafka message handler that invokes actions as directed by message on 
topic "/actions/invoke".
@@ -92,7 +97,7 @@ class Invoker(
         // hence when the transaction is fully processed, this method will 
complete a promise with the datastore
         // future writing back the activation record and for which there are 
three cases:
         // 1. success: there were no exceptions and hence the invoke path 
operated normally,
-        // 2. error during invocation: an exception occurred while trying to 
run the action,
+        // 2. error during invocation: an exception occurred while trying to 
run the action (failed to bring up a container for example),
         // 3. error fetching action: an exception occurred reading from the 
db, didn't get to run.
         val transactionPromise = Promise[DocInfo]
 
@@ -102,6 +107,7 @@ class Invoker(
         if (actionid.rev == DocRevision.empty) {
             logging.error(this, s"revision was not provided for 
${actionid.id}")
         }
+
         WhiskAction.get(entityStore, actionid.id, actionid.rev, fromCache = 
actionid.rev != DocRevision.empty) onComplete {
             case Success(action) =>
                 // only Exec instances that are subtypes of CodeExec reach the 
invoker
@@ -143,8 +149,12 @@ class Invoker(
         implicit transid: TransactionId): Future[DocInfo] = {
         val msg = tran.msg
         val interval = computeActivationInterval(tran)
-        val activation = makeWhiskActivation(msg, EntityPath(name.id), 
version, response, interval, limits)
-        completeTransaction(tran, activation, FailedActivation(transid))
+        val activationResult = makeWhiskActivation(msg, EntityPath(name.id), 
version, response, interval, limits)
+
+        // send activate ack for failed activations
+        sendActiveAck(tran, activationResult)
+
+        completeTransaction(tran, activationResult, FailedActivation(transid))
     }
 
     /*
@@ -189,7 +199,8 @@ class Invoker(
         } map {
             case (failedInit, con, result) =>
                 // process the result and send active ack message
-                val activationResult = sendActiveAck(tran, action, failedInit, 
result)
+                val activationResult = makeActivationResultForSuccess(tran, 
action, failedInit, result)
+                sendActiveAck(tran, activationResult)
 
                 // after sending active ack, drain logs and return container
                 val contents = getContainerLogs(con, 
action.exec.asInstanceOf[CodeExec[_]].sentinelledLogs, action.limits.logs)
@@ -241,26 +252,40 @@ class Invoker(
     }
 
     /**
-     * Creates WhiskActivation for the "run result" (which could be a failed 
initialization) and send
-     * ActiveAck message.
+     * Creates WhiskActivation for the "run result" (which could be a failed 
initialization); this
+     * method is only reached if the action actually ran with no invoker 
exceptions).
      *
      * @return WhiskActivation
      */
-    private def sendActiveAck(tran: Transaction, action: WhiskAction, 
failedInit: Boolean, result: RunResult)(
+    private def makeActivationResultForSuccess(tran: Transaction, action: 
WhiskAction, failedInit: Boolean, result: RunResult)(
         implicit transid: TransactionId): WhiskActivation = {
         if (!failedInit) tran.runInterval = Some(result.interval)
 
         val msg = tran.msg
         val activationInterval = computeActivationInterval(tran)
         val activationResponse = getActivationResponse(activationInterval, 
action.limits.timeout.duration, result, failedInit)
-        val activationResult = makeWhiskActivation(msg, 
EntityPath(action.fullyQualifiedName(false).toString), action.version, 
activationResponse, activationInterval, Some(action.limits))
-        val completeMsg = CompletionMessage(transid, activationResult, 
this.name)
+        makeWhiskActivation(msg, 
EntityPath(action.fullyQualifiedName(false).toString), action.version, 
activationResponse, activationInterval, Some(action.limits))
+    }
 
-        producer.send(s"completed${msg.rootControllerIndex.toInt}", 
completeMsg) map { status =>
-            logging.info(this, s"posted completion of activation 
${msg.activationId}")
+    /**
+     * Sends ActiveAck message for a completed activation.
+     * If for some reason posting to the message bus fails, an active ack may 
not be sent.
+     */
+    private def sendActiveAck(tran: Transaction, activationResult: 
WhiskActivation)(
+        implicit transid: TransactionId): Unit = {
+
+        def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean 
= false) = {
+            val msg = CompletionMessage(transid, res, this.name)
+            producer.send(s"completed${tran.msg.rootControllerIndex.toInt}", 
msg).andThen {
+                case Success(_) =>
+                    logging.info(this, s"posted ${if (recovery) "recovery" 
else ""} completion of activation ${activationResult.activationId}")
+            }
         }
 
-        activationResult
+        send(Right(activationResult)).onFailure {
+            case t if t.getCause.isInstanceOf[RecordTooLargeException] =>
+                send(Left(activationResult.activationId), recovery = true)
+        }
     }
 
     // The nodeJsAction runner inserts this line in the logs at the end
diff --git 
a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala 
b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index b90db49..d620e3d 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -23,6 +23,8 @@ import scala.concurrent.duration._
 import scala.util.Failure
 import scala.util.Success
 
+import org.apache.kafka.common.errors.RecordTooLargeException
+
 import akka.actor.ActorRef
 import akka.actor.ActorRefFactory
 import akka.actor.ActorSystem
@@ -107,10 +109,20 @@ class InvokerReactive(
     }
 
     /** Sends an active-ack. */
-    val ack = (tid: TransactionId, activation: WhiskActivation, 
controllerInstance: InstanceId) => {
+    val ack = (tid: TransactionId, activationResult: WhiskActivation, 
controllerInstance: InstanceId) => {
         implicit val transid = tid
-        producer.send(s"completed${controllerInstance.toInt}", 
CompletionMessage(tid, activation, s"invoker${instance.toInt}")).andThen {
-            case Success(_) => logging.info(this, s"posted completion of 
activation ${activation.activationId}")
+
+        def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean 
= false) = {
+            val msg = CompletionMessage(transid, res, this.name)
+            producer.send(s"completed${controllerInstance.toInt}", 
msg).andThen {
+                case Success(_) =>
+                    logging.info(this, s"posted ${if (recovery) "recovery" 
else ""} completion of activation ${activationResult.activationId}")
+            }
+        }
+
+        send(Right(activationResult)).recoverWith {
+            case t if t.getCause.isInstanceOf[RecordTooLargeException] =>
+                send(Left(activationResult.activationId), recovery = true)
         }
     }
 
diff --git a/tests/src/test/scala/whisk/common/SchedulerTests.scala 
b/tests/src/test/scala/whisk/common/SchedulerTests.scala
index 1e3df96..cc2a56b 100644
--- a/tests/src/test/scala/whisk/common/SchedulerTests.scala
+++ b/tests/src/test/scala/whisk/common/SchedulerTests.scala
@@ -117,6 +117,21 @@ class SchedulerTests
         callCount shouldBe 1
     }
 
+    it should "log scheduler halt message with tid" in {
+        implicit val transid = TransactionId.testing
+        val msg = "test threw an exception"
+
+        stream.reset()
+        val scheduled = Scheduler.scheduleWaitAtLeast(timeBetweenCalls) { () =>
+            throw new Exception(msg)
+        }
+
+        waitForCalls()
+        stream.toString.split(" ").drop(1).mkString(" ") shouldBe {
+            s"[ERROR] [#sid_1] [Scheduler] halted because $msg\n"
+        }
+    }
+
     it should "not stop the scheduler if the future from the closure is 
failed" in {
         var callCount = 0
 
@@ -146,10 +161,54 @@ class SchedulerTests
 
         val differences = calculateDifferences(calls)
         withClue(s"expecting all $differences to be <= $timeBetweenCalls") {
+            differences should not be 'empty
             differences.forall(_ <= timeBetweenCalls + schedulerSlack)
         }
     }
 
+    it should "delay initial schedule by given duration" in {
+        val timeBetweenCalls = 200 milliseconds
+        val initialDelay = 1.second
+        var callCount = 0
+
+        val scheduled = Scheduler.scheduleWaitAtMost(timeBetweenCalls, 
initialDelay) { () =>
+            callCount += 1
+            Future successful true
+        }
+
+        try {
+            Thread.sleep(initialDelay.toMillis)
+            callCount should be <= 1
+
+            Thread.sleep(2 * timeBetweenCalls.toMillis)
+            callCount should be > 1
+        } finally {
+            scheduled ! PoisonPill
+        }
+    }
+
+    it should "perform work immediately when requested" in {
+        val timeBetweenCalls = 200 milliseconds
+        val initialDelay = 1.second
+        var callCount = 0
+
+        val scheduled = Scheduler.scheduleWaitAtMost(timeBetweenCalls, 
initialDelay) { () =>
+            callCount += 1
+            Future successful true
+        }
+
+        try {
+            Thread.sleep(2 * timeBetweenCalls.toMillis)
+            callCount should be(0)
+
+            scheduled ! Scheduler.WorkOnceNow
+            Thread.sleep(timeBetweenCalls.toMillis)
+            callCount should be(1)
+        } finally {
+            scheduled ! PoisonPill
+        }
+    }
+
     it should "not wait when the closure takes longer than the interval" in {
         val calls = Buffer[Instant]()
         val timeBetweenCalls = 200 milliseconds
@@ -165,6 +224,7 @@ class SchedulerTests
 
         val differences = calculateDifferences(calls)
         withClue(s"expecting all $differences to be <= $computationTime") {
+            differences should not be 'empty
             differences.forall(_ <= computationTime + schedulerSlack)
         }
     }
diff --git 
a/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala 
b/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala
new file mode 100644
index 0000000..01f0282
--- /dev/null
+++ 
b/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.connector.tests
+
+import java.time.Instant
+
+import scala.util.Success
+import scala.concurrent.duration.DurationInt
+
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import org.scalatest.junit.JUnitRunner
+
+import spray.json.DefaultJsonProtocol.StringJsonFormat
+import spray.json._
+import whisk.common.TransactionId
+import whisk.core.connector.CompletionMessage
+import whisk.core.entity._
+import whisk.core.entity.size.SizeInt
+
+/**
+ * Unit tests for the CompletionMessage object.
+ */
+@RunWith(classOf[JUnitRunner])
+class CompletionMessageTests extends FlatSpec with Matchers {
+
+    behavior of "completion message"
+
+    val activation = WhiskActivation(
+        namespace = EntityPath("ns"),
+        name = EntityName("a"),
+        Subject(),
+        activationId = ActivationId(),
+        start = Instant.now(),
+        end = Instant.now(),
+        response = ActivationResponse.success(Some(JsObject("res" -> 
JsNumber(1)))),
+        annotations = Parameters("limits", ActionLimits(
+            TimeLimit(1.second),
+            MemoryLimit(128.MB),
+            LogLimit(1.MB)).toJson),
+        duration = Some(123))
+
+    it should "serialize a left completion message" in {
+        val m = CompletionMessage(TransactionId.testing, Left(ActivationId()), 
"xyz")
+        m.serialize shouldBe JsObject(
+            "transid" -> m.transid.toJson,
+            "response" -> m.response.left.get.toJson,
+            "invoker" -> m.invoker.toJson).compactPrint
+    }
+
+    it should "serialize a right completion message" in {
+        val m = CompletionMessage(TransactionId.testing, Right(activation), 
"xyz")
+        m.serialize shouldBe JsObject(
+            "transid" -> m.transid.toJson,
+            "response" -> m.response.right.get.toJson,
+            "invoker" -> m.invoker.toJson).compactPrint
+    }
+
+    it should "deserialize a left completion message" in {
+        val m = CompletionMessage(TransactionId.testing, Left(ActivationId()), 
"xyz")
+        CompletionMessage.parse(m.serialize) shouldBe Success(m)
+    }
+
+    it should "deserialize a right completion message" in {
+        val m = CompletionMessage(TransactionId.testing, Right(activation), 
"xyz")
+        CompletionMessage.parse(m.serialize) shouldBe Success(m)
+    }
+}
diff --git 
a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala 
b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
index 8d8dc1d..4cc0590 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
@@ -201,7 +201,7 @@ class DegenerateLoadBalancerService(config: 
WhiskConfig)(implicit ec: ExecutionC
 
     override def getActiveUserActivationCounts: Map[String, Int] = Map()
 
-    override def publish(action: WhiskAction, msg: ActivationMessage, timeout: 
FiniteDuration)(implicit transid: TransactionId): 
Future[Future[WhiskActivation]] =
+    override def publish(action: WhiskAction, msg: ActivationMessage)(implicit 
transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] =
         Future.successful {
             whiskActivationStub map {
                 case (timeout, activation) => Future {
@@ -210,7 +210,7 @@ class DegenerateLoadBalancerService(config: 
WhiskConfig)(implicit ec: ExecutionC
                         Thread.sleep(timeout.toMillis)
                         println(".... done waiting")
                     }
-                    activation
+                    Right(activation)
                 }
             } getOrElse Future.failed(new IllegalArgumentException("Unit test 
does not need fast path"))
         }
diff --git 
a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala 
b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
index 1d3e932..9a4dc92 100644
--- a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
@@ -260,8 +260,13 @@ trait WebActionsApiTests extends ControllerTestCommon with 
BeforeAndAfterEach wi
         }
     }
 
-    override protected[controller] def invokeAction(user: Identity, action: 
WhiskAction, payload: Option[JsObject], blocking: Boolean, waitOverride: 
Option[FiniteDuration] = None)(
-        implicit transid: TransactionId): Future[(ActivationId, 
Option[WhiskActivation])] = {
+    override protected[controller] def invokeAction(
+        user: Identity,
+        action: WhiskAction,
+        payload: Option[JsObject],
+        waitForResponse: Option[FiniteDuration],
+        cause: Option[ActivationId])(
+            implicit transid: TransactionId): Future[Either[ActivationId, 
WhiskActivation]] = {
         invocationCount = invocationCount + 1
 
         if (failActivation == 0) {
@@ -304,9 +309,9 @@ trait WebActionsApiTests extends ControllerTestCommon with 
BeforeAndAfterEach wi
             }
             action.parameters.get("z") shouldBe 
defaultActionParameters.get("z")
 
-            Future.successful(activation.activationId, Some(activation))
+            Future.successful(Right(activation))
         } else if (failActivation == 1) {
-            Future.successful(ActivationId(), None)
+            Future.successful(Left(ActivationId()))
         } else {
             Future.failed(new IllegalStateException("bad activation"))
         }
diff --git a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala 
b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
index 5784970..ee43dbe 100644
--- a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
+++ b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
@@ -120,27 +120,39 @@ class ActionLimitsTests extends TestHelpers with 
WskTestHelpers {
             }
     }
 
-    it should "succeed but truncate result, if result exceeds its limit" in 
withAssetCleaner(wskprops) {
-        (wp, assetHelper) =>
-            val name = "TestActionCausingExcessiveResult"
-            assetHelper.withCleaner(wsk.action, name) {
-                val actionName = 
TestUtils.getTestActionFilename("sizedResult.js")
-                (action, _) => action.create(name, Some(actionName))
-            }
+    Seq(true, false).foreach { blocking =>
+        it should s"succeed but truncate result, if result exceeds its limit 
(blocking: $blocking)" in withAssetCleaner(wskprops) {
+            (wp, assetHelper) =>
+                val name = "TestActionCausingExcessiveResult"
+                assetHelper.withCleaner(wsk.action, name) {
+                    val actionName = 
TestUtils.getTestActionFilename("sizedResult.js")
+                    (action, _) => action.create(name, Some(actionName), 
timeout = Some(15.seconds))
+                }
 
-            val allowedSize = 
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT.toBytes
-            val run = wsk.action.invoke(name, Map("size" -> (allowedSize + 
1).toJson, "char" -> "a".toJson))
-            withActivation(wsk.activation, run) { activation =>
-                val response = activation.response
-                response.success shouldBe false
-                response.status shouldBe 
ActivationResponse.messageForCode(ActivationResponse.ContainerError)
-                val msg = 
response.result.get.fields(ActivationResponse.ERROR_FIELD).convertTo[String]
-                val expected = Messages.truncatedResponse((allowedSize + 
10).B, allowedSize.B)
-                withClue(s"is: ${msg.take(expected.length)}\nexpected: 
$expected") {
-                    msg.startsWith(expected) shouldBe true
+                val allowedSize = 
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT.toBytes
+
+                def checkResponse(activation: CliActivation) = {
+                    val response = activation.response
+                    response.success shouldBe false
+                    response.status shouldBe 
ActivationResponse.messageForCode(ActivationResponse.ContainerError)
+                    val msg = 
response.result.get.fields(ActivationResponse.ERROR_FIELD).convertTo[String]
+                    val expected = Messages.truncatedResponse((allowedSize + 
10).B, allowedSize.B)
+                    withClue(s"is: ${msg.take(expected.length)}\nexpected: 
$expected") {
+                        msg.startsWith(expected) shouldBe true
+                    }
+                    msg.endsWith("a") shouldBe true
                 }
-                msg.endsWith("a") shouldBe true
-            }
+
+                // this tests an active ack failure to post from invoker
+                val args = Map("size" -> (allowedSize + 1).toJson, "char" -> 
"a".toJson)
+                val code = if (blocking) TestUtils.APP_ERROR else 
TestUtils.SUCCESS_EXIT
+                val rr = wsk.action.invoke(name, args, blocking = blocking, 
expectedExitCode = code)
+                if (blocking) {
+                    
checkResponse(wsk.parseJsonString(rr.stderr).convertTo[CliActivation])
+                } else {
+                    withActivation(wsk.activation, rr) { checkResponse(_) }
+                }
+        }
     }
 
     it should "succeed with one log line" in withAssetCleaner(wskprops) {
@@ -245,7 +257,7 @@ class ActionLimitsTests extends TestHelpers with 
WskTestHelpers {
                 (action, _) => action.create(name, Some(actionName), memory = 
Some(allowedMemory))
             }
 
-            for( a <- 1 to 10){
+            for (a <- 1 to 10) {
                 val run = wsk.action.invoke(name, Map("payload" -> 
"128".toJson))
                 withActivation(wsk.activation, run) { response =>
                     response.response.status shouldBe "success"

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].

Reply via email to