This is an automated email from the ASF dual-hosted git repository. cbickel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 7f571c3 Ensure ResultMessage is processed. (#4135) 7f571c3 is described below commit 7f571c32bb8f3155c89f1d96fda4320909e097fd Author: jiangpch <jiangpengch...@navercorp.com> AuthorDate: Thu Nov 29 15:08:48 2018 +0800 Ensure ResultMessage is processed. (#4135) --- .../ShardingContainerPoolBalancer.scala | 29 ++++++++++------------ 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala index 35a4547..4010cc1 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala @@ -175,6 +175,8 @@ class ShardingContainerPoolBalancer( /** State related to invocations and throttling */ protected[loadBalancer] val activations = TrieMap[ActivationId, ActivationEntry]() + protected[loadBalancer] val blockingPromises = + TrieMap[ActivationId, Promise[Either[ActivationId, WhiskActivation]]]() private val activationsPerNamespace = TrieMap[UUID, LongAdder]() private val totalActivations = new LongAdder() private val totalActivationMemory = new LongAdder() @@ -262,9 +264,13 @@ class ShardingContainerPoolBalancer( chosen .map { invoker => - val entry = setupActivation(msg, action, invoker) + setupActivation(msg, action, invoker) sendActivationToInvoker(messageProducer, msg, invoker).map { _ => - entry.promise.future + if (msg.blocking) { + blockingPromises.getOrElseUpdate(msg.activationId, Promise[Either[ActivationId, WhiskActivation]]()).future + } else { + Future.successful(Left(msg.activationId)) + } } } .getOrElse { @@ -313,8 +319,7 @@ class ShardingContainerPoolBalancer( action.limits.memory.megabytes.MB, action.limits.concurrency.maxConcurrent, action.fullyQualifiedName(true), - timeoutHandler, - Promise[Either[ActivationId, WhiskActivation]]()) + timeoutHandler) }) } @@ -387,9 +392,7 @@ class ShardingContainerPoolBalancer( // Resolve the promise to send the result back to the user // The activation will be removed from `activations`-map later, when we receive the completion message, because the // slot of the invoker is not yet free for new activations. - activations.get(aid).map { entry => - entry.promise.trySuccess(response) - } + blockingPromises.remove(aid).map(_.trySuccess(response)) logging.info(this, s"received result ack for '$aid'")(tid) } @@ -422,13 +425,9 @@ class ShardingContainerPoolBalancer( .foreach(_.releaseConcurrent(entry.fullyQualifiedEntityName, entry.maxConcurrent, entry.memory.toMB.toInt)) if (!forced) { entry.timeoutHandler.cancel() - // If the action was blocking and the Resultmessage has been received before nothing will happen here. - // If the action was blocking and the ResultMessage is still missing, we pass the ActivationId. With this Id, - // the controller will get the result out of the database. - // If the action was non-blocking, we will close the promise here. - entry.promise.trySuccess(Left(aid)) } else { - entry.promise.tryFailure(new Throwable("no completion ack received")) + // remove blocking promise when timeout, if the ResultMessage is already processed, this will do nothing + blockingPromises.remove(aid).foreach(_.tryFailure(new Throwable("no completion ack received"))) } logging.info(this, s"${if (!forced) "received" else "forced"} completion ack for '$aid'")(tid) @@ -717,7 +716,6 @@ case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double, timeout * @param namespaceId namespace that invoked the action * @param invokerName invoker the action is scheduled to * @param timeoutHandler times out completion of this activation, should be canceled on good paths - * @param promise the promise to be completed by the activation */ case class ActivationEntry(id: ActivationId, namespaceId: UUID, @@ -725,5 +723,4 @@ case class ActivationEntry(id: ActivationId, memory: ByteSize, maxConcurrent: Int, fullyQualifiedEntityName: FullyQualifiedEntityName, - timeoutHandler: Cancellable, - promise: Promise[Either[ActivationId, WhiskActivation]]) + timeoutHandler: Cancellable)