This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new b803c64  Ensure that Result-ack is sent before Completion-ack. (#4115)
b803c64 is described below

commit b803c64366e273a0747c25c55873eb733a42f793
Author: Christian Bickel <git...@cbickel.de>
AuthorDate: Fri Nov 23 22:29:26 2018 +0100

    Ensure that Result-ack is sent before Completion-ack. (#4115)
    
    Improves comments to clarify the ordering of result and completion messages.
    Adds a type alias for the active ack messages, and document the interface.
    
    Co-authored-by: Christian Bickel <cbic...@de.ibm.com>
    Co-authored-by: Rodric Rabbah <rod...@gmail.com>
---
 .../core/containerpool/ContainerProxy.scala        | 29 +++++++++++++++++-----
 .../openwhisk/core/invoker/InvokerReactive.scala   | 29 +++++++++++++++++-----
 2 files changed, 46 insertions(+), 12 deletions(-)

diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index 0c126a2..0fdea5e 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -24,6 +24,7 @@ import akka.actor.{FSM, Props, Stash}
 import akka.event.Logging.InfoLevel
 import akka.pattern.pipe
 import pureconfig.loadConfigOrThrow
+
 import scala.collection.immutable
 import spray.json.DefaultJsonProtocol._
 import spray.json._
@@ -35,6 +36,7 @@ import org.apache.openwhisk.core.database.UserContext
 import org.apache.openwhisk.core.entity.ExecManifest.ImageName
 import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.invoker.InvokerReactive.ActiveAck
 import org.apache.openwhisk.http.Messages
 
 import scala.concurrent.Future
@@ -127,7 +129,7 @@ case object RunCompleted
  */
 class ContainerProxy(
   factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => 
Future[Container],
-  sendActiveAck: (TransactionId, WhiskActivation, Boolean, 
ControllerInstanceId, UUID, Boolean) => Future[Any],
+  sendActiveAck: ActiveAck,
   storeActivation: (TransactionId, WhiskActivation, UserContext) => 
Future[Any],
   collectLogs: (TransactionId, Identity, WhiskActivation, Container, 
ExecutableWhiskAction) => Future[ActivationLogs],
   instance: InvokerInstanceId,
@@ -496,10 +498,16 @@ class ContainerProxy(
             ActivationResponse.whiskError(Messages.abnormalRun))
       }
 
-    // Sending active ack. Entirely asynchronous and not waited upon.
-    if (job.msg.blocking) {
-      activation.foreach(
+    // Sending an active ack is an asynchronous operation. The result is 
forwarded as soon as
+    // possible for blocking activations so that dependent activations can be 
scheduled. The
+    // completion message which frees a load balancer slot is sent after the 
active ack future
+    // completes to ensure proper ordering.
+    val sendResult = if (job.msg.blocking) {
+      activation.map(
         sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, 
job.msg.user.namespace.uuid, false))
+    } else {
+      // For non-blocking request, do not forward the result.
+      Future.successful(())
     }
 
     val context = UserContext(job.msg.user)
@@ -530,8 +538,17 @@ class ContainerProxy(
     activationWithLogs
       .map(_.fold(_.activation, identity))
       .foreach { activation =>
-        // Sending the completionMessage to the controller asynchronously.
-        sendActiveAck(tid, activation, job.msg.blocking, 
job.msg.rootControllerIndex, job.msg.user.namespace.uuid, true)
+        // Sending the completion message to the controller after the active 
ack ensures proper ordering
+        // (result is received before the completion message for blocking 
invokes).
+        sendResult.onComplete(
+          _ =>
+            sendActiveAck(
+              tid,
+              activation,
+              job.msg.blocking,
+              job.msg.rootControllerIndex,
+              job.msg.user.namespace.uuid,
+              true))
         // Storing the record. Entirely asynchronous and not waited upon.
         storeActivation(tid, activation, context)
       }
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
index da86843..ab30510 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
@@ -43,6 +43,23 @@ import scala.concurrent.duration._
 import scala.concurrent.{ExecutionContext, Future}
 import scala.util.{Failure, Success}
 
+object InvokerReactive {
+
+  /**
+   * An method for sending Active Acknowledgements (aka "active ack") messages 
to the load balancer. These messages
+   * are either completion messages for an activation to indicate a resource 
slot is free, or result-forwarding
+   * messages for continuations (e.g., sequences and conductor actions).
+   *
+   * @param TransactionId the transaction id for the activation
+   * @param WhiskActivaiton is the activation result
+   * @param Boolean is true iff the activation was a blocking request
+   * @param ControllerInstanceId the originating controller/loadbalancer id
+   * @param UUID is the UUID for the namespace owning the activation
+   * @param Boolean is true this is resource free message and false if this is 
a result forwarding message
+   */
+  type ActiveAck = (TransactionId, WhiskActivation, Boolean, 
ControllerInstanceId, UUID, Boolean) => Future[Any]
+}
+
 class InvokerReactive(
   config: WhiskConfig,
   instance: InvokerInstanceId,
@@ -115,12 +132,12 @@ class InvokerReactive(
   })
 
   /** Sends an active-ack. */
-  private val ack = (tid: TransactionId,
-                     activationResult: WhiskActivation,
-                     blockingInvoke: Boolean,
-                     controllerInstance: ControllerInstanceId,
-                     userId: UUID,
-                     isSlotFree: Boolean) => {
+  private val ack: InvokerReactive.ActiveAck = (tid: TransactionId,
+                                                activationResult: 
WhiskActivation,
+                                                blockingInvoke: Boolean,
+                                                controllerInstance: 
ControllerInstanceId,
+                                                userId: UUID,
+                                                isSlotFree: Boolean) => {
     implicit val transid: TransactionId = tid
 
     def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = 
false) = {

Reply via email to