This is an automated email from the ASF dual-hosted git repository.

cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f571c3  Ensure ResultMessage is processed. (#4135)
7f571c3 is described below

commit 7f571c32bb8f3155c89f1d96fda4320909e097fd
Author: jiangpch <jiangpengch...@navercorp.com>
AuthorDate: Thu Nov 29 15:08:48 2018 +0800

    Ensure ResultMessage is processed. (#4135)
---
 .../ShardingContainerPoolBalancer.scala            | 29 ++++++++++------------
 1 file changed, 13 insertions(+), 16 deletions(-)

diff --git 
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
 
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 35a4547..4010cc1 100644
--- 
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ 
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -175,6 +175,8 @@ class ShardingContainerPoolBalancer(
 
   /** State related to invocations and throttling */
   protected[loadBalancer] val activations = TrieMap[ActivationId, 
ActivationEntry]()
+  protected[loadBalancer] val blockingPromises =
+    TrieMap[ActivationId, Promise[Either[ActivationId, WhiskActivation]]]()
   private val activationsPerNamespace = TrieMap[UUID, LongAdder]()
   private val totalActivations = new LongAdder()
   private val totalActivationMemory = new LongAdder()
@@ -262,9 +264,13 @@ class ShardingContainerPoolBalancer(
 
     chosen
       .map { invoker =>
-        val entry = setupActivation(msg, action, invoker)
+        setupActivation(msg, action, invoker)
         sendActivationToInvoker(messageProducer, msg, invoker).map { _ =>
-          entry.promise.future
+          if (msg.blocking) {
+            blockingPromises.getOrElseUpdate(msg.activationId, 
Promise[Either[ActivationId, WhiskActivation]]()).future
+          } else {
+            Future.successful(Left(msg.activationId))
+          }
         }
       }
       .getOrElse {
@@ -313,8 +319,7 @@ class ShardingContainerPoolBalancer(
           action.limits.memory.megabytes.MB,
           action.limits.concurrency.maxConcurrent,
           action.fullyQualifiedName(true),
-          timeoutHandler,
-          Promise[Either[ActivationId, WhiskActivation]]())
+          timeoutHandler)
       })
   }
 
@@ -387,9 +392,7 @@ class ShardingContainerPoolBalancer(
     // Resolve the promise to send the result back to the user
     // The activation will be removed from `activations`-map later, when we 
receive the completion message, because the
     // slot of the invoker is not yet free for new activations.
-    activations.get(aid).map { entry =>
-      entry.promise.trySuccess(response)
-    }
+    blockingPromises.remove(aid).map(_.trySuccess(response))
     logging.info(this, s"received result ack for '$aid'")(tid)
   }
 
@@ -422,13 +425,9 @@ class ShardingContainerPoolBalancer(
           .foreach(_.releaseConcurrent(entry.fullyQualifiedEntityName, 
entry.maxConcurrent, entry.memory.toMB.toInt))
         if (!forced) {
           entry.timeoutHandler.cancel()
-          // If the action was blocking and the Resultmessage has been 
received before nothing will happen here.
-          // If the action was blocking and the ResultMessage is still 
missing, we pass the ActivationId. With this Id,
-          // the controller will get the result out of the database.
-          // If the action was non-blocking, we will close the promise here.
-          entry.promise.trySuccess(Left(aid))
         } else {
-          entry.promise.tryFailure(new Throwable("no completion ack received"))
+          // remove blocking promise when timeout, if the ResultMessage is 
already processed, this will do nothing
+          blockingPromises.remove(aid).foreach(_.tryFailure(new Throwable("no 
completion ack received")))
         }
 
         logging.info(this, s"${if (!forced) "received" else "forced"} 
completion ack for '$aid'")(tid)
@@ -717,7 +716,6 @@ case class 
ShardingContainerPoolBalancerConfig(blackboxFraction: Double, timeout
  * @param namespaceId namespace that invoked the action
  * @param invokerName invoker the action is scheduled to
  * @param timeoutHandler times out completion of this activation, should be 
canceled on good paths
- * @param promise the promise to be completed by the activation
  */
 case class ActivationEntry(id: ActivationId,
                            namespaceId: UUID,
@@ -725,5 +723,4 @@ case class ActivationEntry(id: ActivationId,
                            memory: ByteSize,
                            maxConcurrent: Int,
                            fullyQualifiedEntityName: FullyQualifiedEntityName,
-                           timeoutHandler: Cancellable,
-                           promise: Promise[Either[ActivationId, 
WhiskActivation]])
+                           timeoutHandler: Cancellable)

Reply via email to