This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 8930735 Send active ack on failed activations (#2384) 8930735 is described below commit 8930735e16eef37b22df57e3430bca6d75e0a6c7 Author: rodric rabbah <rod...@gmail.com> AuthorDate: Fri Jun 23 11:57:48 2017 -0400 Send active ack on failed activations (#2384) * Sends active ack for failed activations due to container startup failures. Add recovery in loadbalancer for when invoker dies and does not send active ack. Consolidate all blocking request handling into a single stateful actor. Modify completion message to response with either id/result for when the active ack might exceed bus limit. Retry failure to send active ack from invoker. --- .../src/main/scala/whisk/common/Scheduler.scala | 56 ++++- .../main/scala/whisk/core/connector/Message.scala | 12 +- .../main/scala/whisk/core/entity/InstanceId.scala | 13 +- .../whisk/utils/ExecutionContextFactory.scala | 12 +- .../main/scala/whisk/core/controller/Actions.scala | 16 +- .../scala/whisk/core/controller/WebActions.scala | 17 +- .../controller/actions/PostActionActivation.scala | 26 ++- .../core/controller/actions/PrimitiveActions.scala | 229 +++++++++++++-------- .../core/controller/actions/SequenceActions.scala | 77 +++---- .../core/loadBalancer/LoadBalancerService.scala | 73 ++++--- .../main/scala/whisk/core/invoker/Invoker.scala | 49 +++-- .../scala/whisk/core/invoker/InvokerReactive.scala | 18 +- .../test/scala/whisk/common/SchedulerTests.scala | 60 ++++++ .../connector/tests/CompletionMessageTests.scala | 84 ++++++++ .../controller/test/ControllerTestCommon.scala | 4 +- .../core/controller/test/WebActionsApiTests.scala | 13 +- .../whisk/core/limits/ActionLimitsTests.scala | 52 +++-- 17 files changed, 542 insertions(+), 269 deletions(-) diff --git a/common/scala/src/main/scala/whisk/common/Scheduler.scala b/common/scala/src/main/scala/whisk/common/Scheduler.scala index 506760a..b51f75a 100644 --- a/common/scala/src/main/scala/whisk/common/Scheduler.scala +++ b/common/scala/src/main/scala/whisk/common/Scheduler.scala @@ -25,6 +25,7 @@ import scala.util.Try import akka.actor.Actor import akka.actor.ActorSystem +import akka.actor.Cancellable import akka.actor.Props /** @@ -32,7 +33,8 @@ import akka.actor.Props * even for asynchronous tasks. */ object Scheduler { - private case object Work + case object WorkOnceNow + private case object ScheduledWork /** * Sets up an Actor to send itself a message to mimic schedulers behavior in a more controllable way. @@ -41,24 +43,40 @@ object Scheduler { * @param alwaysWait always wait for the given amount of time or calculate elapsed time to wait * @param closure the closure to be executed */ - private class Worker(interval: FiniteDuration, alwaysWait: Boolean, closure: () => Future[Any])(implicit logging: Logging) extends Actor { + private class Worker( + initialDelay: FiniteDuration, + interval: FiniteDuration, + alwaysWait: Boolean, + name: String, + closure: () => Future[Any])( + implicit logging: Logging, + transid: TransactionId) extends Actor { implicit val ec = context.dispatcher - override def preStart() = { - self ! Work + var lastSchedule: Option[Cancellable] = { + Some(context.system.scheduler.scheduleOnce(initialDelay, self, ScheduledWork)) + } + + override def postStop() = { + logging.info(this, s"$name shutdown") + lastSchedule.foreach(_.cancel()) } def receive = { - case Work => + case WorkOnceNow => Try(closure()) + + case ScheduledWork => val deadline = interval.fromNow Try(closure()) match { case Success(result) => result onComplete { _ => val timeToWait = if (alwaysWait) interval else deadline.timeLeft.max(Duration.Zero) // context might be null here if a PoisonPill is sent while doing computations - Option(context) foreach { _.system.scheduler.scheduleOnce(timeToWait, self, Work) } + lastSchedule = Option(context).map(_.system.scheduler.scheduleOnce(timeToWait, self, ScheduledWork)) } - case Failure(e) => logging.error(this, s"next iteration could not be scheduled because of ${e.getMessage}. Scheduler is halted") + + case Failure(e) => + logging.error(name, s"halted because ${e.getMessage}") } } } @@ -70,11 +88,19 @@ object Scheduler { * is immediately fired. * * @param interval the time to wait at most between two runs of the closure + * @param initialDelay optionally delay the first scheduled iteration by given duration * @param f the function to run */ - def scheduleWaitAtMost(interval: FiniteDuration)(f: () => Future[Any])(implicit system: ActorSystem, logging: Logging) = { + def scheduleWaitAtMost( + interval: FiniteDuration, + initialDelay: FiniteDuration = Duration.Zero, + name: String = "Scheduler")( + f: () => Future[Any])( + implicit system: ActorSystem, + logging: Logging, + transid: TransactionId = TransactionId.unknown) = { require(interval > Duration.Zero) - system.actorOf(Props(new Worker(interval, false, f))) + system.actorOf(Props(new Worker(initialDelay, interval, false, name, f))) } /** @@ -83,10 +109,18 @@ object Scheduler { * given interval. * * @param interval the time to wait between two runs of the closure + * @param initialDelay optionally delay the first scheduled iteration by given duration * @param f the function to run */ - def scheduleWaitAtLeast(interval: FiniteDuration)(f: () => Future[Any])(implicit system: ActorSystem, logging: Logging) = { + def scheduleWaitAtLeast( + interval: FiniteDuration, + initialDelay: FiniteDuration = Duration.Zero, + name: String = "Scheduler")( + f: () => Future[Any])( + implicit system: ActorSystem, + logging: Logging, + transid: TransactionId = TransactionId.unknown) = { require(interval > Duration.Zero) - system.actorOf(Props(new Worker(interval, true, f))) + system.actorOf(Props(new Worker(initialDelay, interval, true, name, f))) } } diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala b/common/scala/src/main/scala/whisk/core/connector/Message.scala index c895690..3491c21 100644 --- a/common/scala/src/main/scala/whisk/core/connector/Message.scala +++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala @@ -91,20 +91,22 @@ object ActivationMessage extends DefaultJsonProtocol { */ case class CompletionMessage( override val transid: TransactionId, - response: WhiskActivation, + response: Either[ActivationId, WhiskActivation], invoker: String) extends Message { - override def serialize = CompletionMessage.serdes.write(this).compactPrint + override def serialize: String = { + CompletionMessage.serdes.write(this).compactPrint + } override def toString = { - s"${response.activationId}" + response.fold(l => l, r => r.activationId).asString } } object CompletionMessage extends DefaultJsonProtocol { - def parse(msg: String) = Try(serdes.read(msg.parseJson)) - implicit val serdes = jsonFormat3(CompletionMessage.apply) + def parse(msg: String): Try[CompletionMessage] = Try(serdes.read(msg.parseJson)) + private val serdes = jsonFormat3(CompletionMessage.apply) } case class PingMessage(name: String) extends Message { diff --git a/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala b/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala index f573b04..03f3271 100644 --- a/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala +++ b/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala @@ -1,11 +1,12 @@ /* - * Copyright 2015-2016 IBM Corporation + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala b/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala index b002981..21971b0 100644 --- a/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala +++ b/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala @@ -50,18 +50,10 @@ object ExecutionContextFactory { implicit val ec = system.dispatcher firstCompletedOf(Seq(f, expire(timeout, system.scheduler)(Future.failed(msg)))) } - } - /** - * Extends a promise with an scheduled call back. The call back may be used to complete the promise. The result of the - * call back is not interesting to this method. - * The idiom to use is: promise after(duration, promise.tryFailure(TimeoutException)`. - */ - implicit class PromiseExtensions[T](p: Promise[T]) { - def after(timeout: FiniteDuration, next: => Unit)(implicit system: ActorSystem): Promise[T] = { + def withAlternativeAfterTimeout(timeout: FiniteDuration, alt: => Future[T])(implicit system: ActorSystem): Future[T] = { implicit val ec = system.dispatcher - expire(timeout, system.scheduler)(Future { next }) - p + firstCompletedOf(Seq(f, expire(timeout, system.scheduler)(alt))) } } diff --git a/core/controller/src/main/scala/whisk/core/controller/Actions.scala b/core/controller/src/main/scala/whisk/core/controller/Actions.scala index e685bf4..6c1db7c 100644 --- a/core/controller/src/main/scala/whisk/core/controller/Actions.scala +++ b/core/controller/src/main/scala/whisk/core/controller/Actions.scala @@ -37,7 +37,6 @@ import spray.json.DefaultJsonProtocol._ import spray.routing.RequestContext import whisk.common.TransactionId import whisk.core.WhiskConfig -import whisk.core.controller.actions.BlockingInvokeTimeout import whisk.core.controller.actions.PostActionActivation import whisk.core.database.NoDocumentException import whisk.core.entitlement._ @@ -59,7 +58,10 @@ object WhiskActionsApi { /** Grace period after action timeout limit to poll for result. */ protected[core] val blockingInvokeGrace = 5 seconds - /** Max duration to wait for a blocking activation. */ + /** + * Max duration to wait for a blocking activation. + * This is the default timeout on a POST request. + */ protected[core] val maxWaitForBlockingActivation = 60 seconds } @@ -221,11 +223,12 @@ trait WhiskActionsApi onComplete(entitleReferencedEntities(user, Privilege.ACTIVATE, Some(action.exec))) { case Success(_) => val actionWithMergedParams = env.map(action.inherit(_)) getOrElse action - onComplete(invokeAction(user, actionWithMergedParams, payload, blocking, Some(waitOverride))) { - case Success((activationId, None)) => + val waitForResponse = if (blocking) Some(waitOverride) else None + onComplete(invokeAction(user, actionWithMergedParams, payload, waitForResponse, cause = None)) { + case Success(Left(activationId)) => // non-blocking invoke or blocking invoke which got queued instead complete(Accepted, activationId.toJsObject) - case Success((activationId, Some(activation))) => + case Success(Right(activation)) => val response = if (result) activation.resultAsJson else activation.toExtendedJson if (activation.response.isSuccess) { @@ -243,9 +246,6 @@ trait WhiskActionsApi } else { complete(InternalServerError, response) } - case Failure(t: BlockingInvokeTimeout) => - logging.info(this, s"[POST] action activation waiting period expired") - complete(Accepted, t.activationId.toJsObject) case Failure(t: RecordTooLargeException) => logging.info(this, s"[POST] action payload was too large") terminate(RequestEntityTooLarge) diff --git a/core/controller/src/main/scala/whisk/core/controller/WebActions.scala b/core/controller/src/main/scala/whisk/core/controller/WebActions.scala index 9d30d70..c05c4c0 100644 --- a/core/controller/src/main/scala/whisk/core/controller/WebActions.scala +++ b/core/controller/src/main/scala/whisk/core/controller/WebActions.scala @@ -43,7 +43,6 @@ import spray.routing.RequestContext import spray.routing.Route import spray.http.HttpMethods.{ OPTIONS, GET, DELETE, POST, PUT, HEAD, PATCH } import whisk.common.TransactionId -import whisk.core.controller.actions.BlockingInvokeTimeout import whisk.core.controller.actions.PostActionActivation import whisk.core.database._ import whisk.core.entity._ @@ -336,6 +335,8 @@ trait WhiskWebActionsApi def routes(user: Identity)(implicit transid: TransactionId): Route = routes(Some(user)) def routes()(implicit transid: TransactionId): Route = routes(None) + private val maxWaitForWebActionResult = Some(WhiskActionsApi.maxWaitForBlockingActivation) + /** * Adds route to web based activations. Actions invoked this way are anonymous in that the * caller is not authenticated. The intended action must be named in the path as a fully qualified @@ -540,8 +541,7 @@ trait WhiskWebActionsApi // they will be overwritten if (isRawHttpAction || context.overrides(webApiDirectives.reservedProperties ++ action.immutableParameters).isEmpty) { val content = context.toActionArgument(onBehalfOf, isRawHttpAction) - val waitOverride = Some(WhiskActionsApi.maxWaitForBlockingActivation) - invokeAction(actionOwnerIdentity, action, Some(JsObject(content)), blocking = true, waitOverride) + invokeAction(actionOwnerIdentity, action, Some(JsObject(content)), maxWaitForWebActionResult, cause = None) } else { Future.failed(RejectRequest(BadRequest, Messages.parametersNotAllowed)) } @@ -551,12 +551,12 @@ trait WhiskWebActionsApi } private def completeRequest( - queuedActivation: Future[(ActivationId, Option[WhiskActivation])], + queuedActivation: Future[Either[ActivationId, WhiskActivation]], projectResultField: => List[String], responseType: MediaExtension)( implicit transid: TransactionId) = { onComplete(queuedActivation) { - case Success((activationId, Some(activation))) => + case Success(Right(activation)) => val result = activation.resultAsJson if (activation.response.isSuccess || activation.response.isApplicationError) { @@ -583,14 +583,9 @@ trait WhiskWebActionsApi terminate(BadRequest, Messages.errorProcessingRequest) } - case Success((activationId, None)) => + case Success(Left(activationId)) => // blocking invoke which got queued instead // this should not happen, instead it should be a blocking invoke timeout - logging.warn(this, "activation returned an id, expecting timeout error instead") - terminate(Accepted, Messages.responseNotReady) - - case Failure(t: BlockingInvokeTimeout) => - // blocking invoke which timed out waiting on response logging.info(this, "activation waiting period expired") terminate(Accepted, Messages.responseNotReady) diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala b/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala index eec3399..5e6be7d 100644 --- a/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala +++ b/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala @@ -18,14 +18,12 @@ package whisk.core.controller.actions import scala.concurrent.Future -import scala.concurrent.TimeoutException import scala.concurrent.duration.FiniteDuration import spray.http.StatusCodes.BadRequest import spray.json._ import whisk.common.TransactionId import whisk.core.controller.RejectRequest -import whisk.core.controller.WhiskActionsApi._ import whisk.core.controller.WhiskServices import whisk.core.entity._ import whisk.http.Messages @@ -40,25 +38,25 @@ protected[core] trait PostActionActivation extends PrimitiveActions with Sequenc * @param user the user posting the activation * @param action the action to activate (parameters for packaged actions must already be merged) * @param payload the parameters to pass to the action - * @param blocking iff true, wait for the activation result - * @param waitOverride iff blocking, wait up up to the action limit or a given max duration - * @return a future that resolves with the (activation id, and some whisk activation if a blocking invoke) + * @param waitForResponse if not empty, wait up to specified duration for a response (this is used for blocking activations) + * @return a future that resolves with Left(activation id) when the request is queued, or Right(activation) for a blocking request + * which completes in time iff waiting for an response */ - protected[controller] def invokeAction(user: Identity, action: WhiskAction, payload: Option[JsObject], blocking: Boolean, waitOverride: Option[FiniteDuration] = None)( - implicit transid: TransactionId): Future[(ActivationId, Option[WhiskActivation])] = { + protected[controller] def invokeAction( + user: Identity, + action: WhiskAction, + payload: Option[JsObject], + waitForResponse: Option[FiniteDuration], + cause: Option[ActivationId])( + implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = { action.exec match { // this is a topmost sequence case SequenceExec(components) => - val futureSeqTuple = invokeSequence(user, action, payload, blocking, topmost = true, components, cause = None, 0) - futureSeqTuple map { case (activationId, wskActivation, _) => (activationId, wskActivation) } + invokeSequence(user, action, components, payload, waitForResponse, cause, topmost = true, 0).map(r => r._1) case supportedExec if !supportedExec.deprecated => - val duration = action.limits.timeout.duration + blockingInvokeGrace - val timeout = waitOverride.getOrElse(duration) - invokeSingleAction(user, action, payload, timeout, blocking) + invokeSingleAction(user, action, payload, waitForResponse, cause) case deprecatedExec => Future.failed(RejectRequest(BadRequest, Messages.runtimeDeprecated(deprecatedExec))) } } } - -protected[controller] case class BlockingInvokeTimeout(activationId: ActivationId) extends TimeoutException diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala index 0911a75..0a9034b 100644 --- a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala +++ b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala @@ -20,21 +20,28 @@ package whisk.core.controller.actions import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.Promise -import scala.concurrent.TimeoutException import scala.concurrent.duration._ +import scala.util.Failure +import akka.actor.Actor import akka.actor.ActorSystem +import akka.actor.Props + import spray.json._ + import whisk.common.Logging import whisk.common.LoggingMarkers +import whisk.common.Scheduler import whisk.common.TransactionId import whisk.core.connector.ActivationMessage -import whisk.core.controller.WhiskActionsApi import whisk.core.controller.WhiskServices import whisk.core.database.NoDocumentException import whisk.core.entity._ import whisk.core.entity.types.ActivationStore import whisk.core.entity.types.EntityStore +import whisk.utils.ExecutionContextFactory.FutureExtensions +import scala.collection.mutable.Buffer +import akka.actor.Cancellable protected[actions] trait PrimitiveActions { /** The core collections require backend services to be injected in this trait. */ @@ -60,15 +67,11 @@ protected[actions] trait PrimitiveActions { /** Database service to get activations. */ protected val activationStore: ActivationStore - /** Max duration for active ack. */ - protected val activeAckTimeout = WhiskActionsApi.maxWaitForBlockingActivation - /** - * Gets document from datastore to confirm a valid action activation then posts request to loadbalancer. - * If the loadbalancer accepts the requests with an activation id, then wait for the result of the activation - * if this is a blocking invoke, else return the activation id. + * Posts request to the loadbalancer. If the loadbalancer accepts the requests with an activation id, + * then wait for the result of the activation if necessary. * - * NOTE: This is a point-in-time type of statement: + * NOTE: * For activations of actions, cause is populated only for actions that were invoked as a result of a sequence activation. * For actions that are enclosed in a sequence and are activated as a result of the sequence activation, the cause * contains the activation id of the immediately enclosing sequence. @@ -77,23 +80,25 @@ protected[actions] trait PrimitiveActions { * cause for c is the activation id of x * cause for s is not defined * - * @param subject the subject invoking the action - * @param docid the action document id + * @param user the identity invoking the action + * @param action the action to invoke * @param payload the dynamic arguments for the activation - * @param timeout the timeout used for polling for result if the invoke is blocking - * @param blocking true iff this is a blocking invoke + * @param waitForResponse if not empty, wait upto specified duration for a response (this is used for blocking activations) * @param cause the activation id that is responsible for this invoke/activation * @param transid a transaction id for logging - * @return a promise that completes with (ActivationId, Some(WhiskActivation)) if blocking else (ActivationId, None) + * @return a promise that completes with one of the following successful cases: + * Right(WhiskActivation) if waiting for a response and response is ready within allowed duration, + * Left(ActivationId) if not waiting for a response, or allowed duration has elapsed without a result ready + * or these custom failures: + * RequestEntityTooLarge if the message is too large to to post to the message bus */ protected[actions] def invokeSingleAction( user: Identity, action: WhiskAction, payload: Option[JsObject], - timeout: FiniteDuration, - blocking: Boolean, - cause: Option[ActivationId] = None)( - implicit transid: TransactionId): Future[(ActivationId, Option[WhiskActivation])] = { + waitForResponse: Option[FiniteDuration], + cause: Option[ActivationId])( + implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = { require(action.exec.kind != Exec.SEQUENCE, "this method requires a primitive action") // merge package parameters with action (action parameters supersede), then merge in payload @@ -109,95 +114,137 @@ protected[actions] trait PrimitiveActions { args, cause = cause) - val startActivation = transid.started(this, if (blocking) LoggingMarkers.CONTROLLER_ACTIVATION_BLOCKING else LoggingMarkers.CONTROLLER_ACTIVATION) - val startLoadbalancer = transid.started(this, LoggingMarkers.CONTROLLER_LOADBALANCER, s"[POST] action activation id: ${message.activationId}") - val postedFuture = loadBalancer.publish(action, message, activeAckTimeout) - postedFuture flatMap { activationResponse => + val startActivation = transid.started(this, waitForResponse.map(_ => LoggingMarkers.CONTROLLER_ACTIVATION_BLOCKING).getOrElse(LoggingMarkers.CONTROLLER_ACTIVATION)) + val startLoadbalancer = transid.started(this, LoggingMarkers.CONTROLLER_LOADBALANCER, s"action activation id: ${message.activationId}") + val postedFuture = loadBalancer.publish(action, message) + + postedFuture.flatMap { activeAckResponse => + // successfully posted activation request to the message bus transid.finished(this, startLoadbalancer) - if (blocking) { - waitForActivationResponse(user, message.activationId, timeout, activationResponse) map { - whiskActivation => (whiskActivation.activationId, Some(whiskActivation)) - } andThen { - case _ => transid.finished(this, startActivation) - } - } else { + + // is caller waiting for the result of the activation? + waitForResponse.map { timeout => + // yes, then wait for the activation response from the message bus + // (known as the active response or active ack) + waitForActivationResponse(user, message.activationId, timeout, activeAckResponse) + .andThen { case _ => transid.finished(this, startActivation) } + }.getOrElse { + // no, return the activation id transid.finished(this, startActivation) - Future.successful { (message.activationId, None) } + Future.successful(Left(message.activationId)) } } } /** - * This is a fast path used for blocking calls in which we do not need the full WhiskActivation record from the DB. - * Polls for the activation response from an underlying data structure populated from Kafka active acknowledgements. - * If this mechanism fails to produce an answer quickly, the future will switch to polling the database for the response - * record. + * Waits for a response from the message bus (e.g., Kafka) containing the result of the activation. This is the fast path + * used for blocking calls where only the result of the activation is needed. This path is called active acknowledgement + * or active ack. + * + * While waiting for the active ack, periodically poll the datastore in case there is a failure in the fast path delivery + * which could happen if the connection from an invoker to the message bus is disrupted, or if the publishing of the response + * fails because the message is too large. */ - private def waitForActivationResponse(user: Identity, activationId: ActivationId, totalWaitTime: FiniteDuration, activationResponse: Future[WhiskActivation])(implicit transid: TransactionId) = { - // this is the promise which active ack or db polling will try to complete in one of four ways: - // 1. active ack response + private def waitForActivationResponse( + user: Identity, + activationId: ActivationId, + totalWaitTime: FiniteDuration, + activeAckResponse: Future[Either[ActivationId, WhiskActivation]])( + implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = { + // this is the promise which active ack or db polling will try to complete via: + // 1. active ack response, or // 2. failing active ack (due to active ack timeout), fall over to db polling // 3. timeout on db polling => converts activation to non-blocking (returns activation id only) - // 4. internal error - val promise = Promise[WhiskActivation] - val docid = DocId(WhiskEntity.qualifiedName(user.namespace.toPath, activationId)) - - logging.info(this, s"[POST] action activation will block on result up to $totalWaitTime") - - // the active ack will timeout after specified duration, causing the db polling to kick in - activationResponse map { - activation => promise.trySuccess(activation) - } onFailure { - case t: TimeoutException => - logging.info(this, s"[POST] switching to poll db, active ack expired") - pollDbForResult(docid, activationId, promise) - case t: Throwable => - logging.info(this, s"[POST] switching to poll db, active ack exception: ${t.getMessage}") - pollDbForResult(docid, activationId, promise) + // 4. internal error message + val promise = Promise[Either[ActivationId, WhiskActivation]] // this is strictly completed by the finishing actor + val finisher = actorSystem.actorOf(Props(new ActivationFinisher(user, activationId, promise))) + + logging.info(this, s"action activation will block for result upto $totalWaitTime") + + activeAckResponse map { + case result @ Right(_) => + // activation complete, result is available + finisher ! Finish(result) + + case _ => + // active ack received but it does not carry the response, + // no result available except by polling the db + logging.warn(this, "pre-emptively polling db because active ack is missing result") + finisher ! Scheduler.WorkOnceNow } - // install a timeout handler; this is the handler for "the action took longer than its Limit" - // note the use of WeakReferences; this is to avoid the handler's closure holding on to the - // WhiskActivation, which in turn holds on to the full JsObject of the response - val promiseRef = new java.lang.ref.WeakReference(promise) - actorSystem.scheduler.scheduleOnce(totalWaitTime) { - val p = promiseRef.get - if (p != null) { - p.tryFailure(new BlockingInvokeTimeout(activationId)) + // return the promise which is either fulfilled by active ack, polling from the database, + // or the timeout alternative when the allowed duration expires (i.e., the action took + // longer than the permitted, per totalWaitTime). + promise.future.withAlternativeAfterTimeout(totalWaitTime, { + Future.successful(Left(activationId)).andThen { + // result no longer interesting; terminate the finisher/shut down db polling if necessary + case _ => actorSystem.stop(finisher) } - } - - // return the future. note that pollDbForResult's isCompleted check will protect against unnecessary db activity - // that may overlap with a totalWaitTime timeout (because the promise will have already by tryFailure'd) - promise.future + }) } - /** - * Polls for activation record. It is assumed that an activation record is created atomically and never updated. - * Fetch the activation record by its id. If it exists, complete the promise. Otherwise recursively poll until - * either there is an error in the get, or the promise has completed because it timed out. The promise MUST - * complete in the caller to terminate the polling. - */ - private def pollDbForResult( - docid: DocId, + /** Periodically polls the db to cover missing active acks. */ + val datastorePollPeriodForActivation = 15.seconds + val datastorePreemptivePolling = Seq(1.second, 3.seconds, 5.seconds, 7.seconds) + + protected case class Finish(activation: Right[ActivationId, WhiskActivation]) + + protected class ActivationFinisher( + user: Identity, activationId: ActivationId, - promise: Promise[WhiskActivation])( - implicit transid: TransactionId): Unit = { - // check if promise already completed due to timeout expiration (abort polling if so) - if (!promise.isCompleted) { - WhiskActivation.get(activationStore, docid) map { - activation => promise.trySuccess(activation.withoutLogs) // Logs always not provided on blocking call - } onFailure { - case e: NoDocumentException => - Thread.sleep(500) - logging.debug(this, s"[POST] action activation not yet timed out, will poll for result") - pollDbForResult(docid, activationId, promise) - case t: Throwable => - logging.error(this, s"[POST] action activation failed while waiting on result: ${t.getMessage}") - promise.tryFailure(t) - } - } else { - logging.info(this, s"[POST] action activation timed out, terminated polling for result") + promise: Promise[Either[ActivationId, WhiskActivation]])( + implicit transid: TransactionId) extends Actor { + + // when the future completes, self-destruct + promise.future.andThen { case _ => shutdown() } + + val docid = DocId(WhiskEntity.qualifiedName(user.namespace.toPath, activationId)) + val preemptiveMsgs: Buffer[Cancellable] = Buffer.empty + + val poller = { + Scheduler.scheduleWaitAtMost( + datastorePollPeriodForActivation, + initialDelay = datastorePollPeriodForActivation, + name = "dbpoll")(() => { + if (!promise.isCompleted) { + WhiskActivation.get(activationStore, docid) map { + // complete the future, which in turn will poison pill this scheduler + activation => + promise.trySuccess(Right(activation.withoutLogs)) // Logs always not provided on blocking call + } andThen { + case Failure(e: NoDocumentException) => // do nothing, scheduler will reschedule another poll + case Failure(t: Throwable) => // something went wrong, abort + logging.error(this, s"failed while waiting on result: ${t.getMessage}") + promise.tryFailure(t) // complete the future, which in turn will poison pill this scheduler + } + } else Future.successful({}) // the scheduler will be halted because the promise is now resolved + }) + } + + def receive = { + case Finish(activation) => + promise.trySuccess(activation) + + case msg @ Scheduler.WorkOnceNow => + // try up to three times when pre-emptying the schedule + datastorePreemptivePolling.foreach { + s => preemptiveMsgs += context.system.scheduler.scheduleOnce(s, poller, msg) + } + + case msg => + poller ! msg + } + + def shutdown(): Unit = { + context.stop(poller) + context.stop(self) + } + + override def postStop() = { + logging.info(this, "finisher shutdown") + preemptiveMsgs.foreach(_.cancel()) + context.stop(poller) } } } diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala index 042bc08..4ec48d8 100644 --- a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala +++ b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala @@ -30,10 +30,10 @@ import scala.util.Failure import scala.util.Success import akka.actor.ActorSystem + import spray.json._ import whisk.common.Logging import whisk.common.TransactionId -import whisk.core.controller.WhiskActionsApi._ import whisk.core.controller.WhiskServices import whisk.core.entity._ import whisk.core.entity.size.SizeInt @@ -60,14 +60,13 @@ protected[actions] trait SequenceActions { protected val activationStore: ActivationStore /** A method that knows how to invoke a single primitive action. */ - protected[actions] def invokeSingleAction( + protected[actions] def invokeAction( user: Identity, action: WhiskAction, payload: Option[JsObject], - timeout: FiniteDuration, - blocking: Boolean, - cause: Option[ActivationId] = None)( - implicit transid: TransactionId): Future[(ActivationId, Option[WhiskActivation])] + waitForResponse: Option[FiniteDuration], + cause: Option[ActivationId])( + implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] /** * Executes a sequence by invoking in a blocking fashion each of its components. @@ -86,13 +85,13 @@ protected[actions] trait SequenceActions { protected[actions] def invokeSequence( user: Identity, action: WhiskAction, - payload: Option[JsObject], - blocking: Boolean, - topmost: Boolean, components: Vector[FullyQualifiedEntityName], + payload: Option[JsObject], + waitForOutermostResponse: Option[FiniteDuration], cause: Option[ActivationId], + topmost: Boolean, atomicActionsCount: Int)( - implicit transid: TransactionId): Future[(ActivationId, Option[WhiskActivation], Int)] = { + implicit transid: TransactionId): Future[(Either[ActivationId, WhiskActivation], Int)] = { require(action.exec.kind == Exec.SEQUENCE, "this method requires an action sequence") // create new activation id that corresponds to the sequence @@ -100,7 +99,10 @@ protected[actions] trait SequenceActions { logging.info(this, s"invoking sequence $action topmost $topmost activationid '$seqActivationId'") val start = Instant.now(Clock.systemUTC()) - val futureSeqResult = { + val futureSeqResult: Future[(Either[ActivationId, WhiskActivation], Int)] = { + // even though the result of completeSequenceActivation is Right[WhiskActivation], + // use a more general type for futureSeqResult in case a blocking invoke takes + // longer than expected and we must return Left[ActivationId] instead completeSequenceActivation( seqActivationId, // the cause for the component activations is the current sequence @@ -109,17 +111,16 @@ protected[actions] trait SequenceActions { } if (topmost) { // need to deal with blocking and closing connection - if (blocking) { + waitForOutermostResponse.map { timeout => logging.info(this, s"invoke sequence blocking topmost!") - val timeout = maxWaitForBlockingActivation + blockingInvokeGrace - // if the future fails with a timeout, the failure is dealt with at the caller level - futureSeqResult.withTimeout(timeout, new BlockingInvokeTimeout(seqActivationId)) - } else { + futureSeqResult.withAlternativeAfterTimeout(timeout, Future.successful(Left(seqActivationId), atomicActionsCount)) + }.getOrElse { // non-blocking sequence execution, return activation id - Future.successful((seqActivationId, None, 0)) + Future.successful(Left(seqActivationId), 0) } } else { // not topmost, no need to worry about terminating incoming request + // and this is a blocking activation therefore by definition // Note: the future for the sequence result recovers from all throwable failures futureSeqResult } @@ -136,20 +137,20 @@ protected[actions] trait SequenceActions { topmost: Boolean, start: Instant, cause: Option[ActivationId])( - implicit transid: TransactionId): Future[(ActivationId, Some[WhiskActivation], Int)] = { + implicit transid: TransactionId): Future[(Right[ActivationId, WhiskActivation], Int)] = { // not topmost, no need to worry about terminating incoming request // Note: the future for the sequence result recovers from all throwable failures futureSeqResult.map { accounting => // sequence terminated, the result of the sequence is the result of the last completed activation val end = Instant.now(Clock.systemUTC()) val seqActivation = makeSequenceActivation(user, action, seqActivationId, accounting, topmost, cause, start, end) - (seqActivationId, Some(seqActivation), accounting.atomicActionCnt) + (Right(seqActivation), accounting.atomicActionCnt) }.andThen { - case Success((_, Some(seqActivation), _)) => storeSequenceActivation(seqActivation) - case Failure(t) => - // This should never happen; in this case, there is no activation record created or stored: - // should there be? - logging.error(this, s"Sequence activation failed: ${t.getMessage}") + case Success((Right(seqActivation), _)) => storeSequenceActivation(seqActivation) + + // This should never happen; in this case, there is no activation record created or stored: + // should there be? + case Failure(t) => logging.error(this, s"Sequence activation failed: ${t.getMessage}") } } @@ -312,27 +313,27 @@ protected[actions] trait SequenceActions { val futureWhiskActivationTuple = action.exec match { case SequenceExec(components) => logging.info(this, s"sequence invoking an enclosed sequence $action") - // call invokeSequence to invoke the inner sequence - invokeSequence(user, action, inputPayload, blocking = true, topmost = false, components, cause, accounting.atomicActionCnt) + // call invokeSequence to invoke the inner sequence; this is a blocking activation by definition + invokeSequence(user, action, components, inputPayload, None, cause, topmost = false, accounting.atomicActionCnt) case _ => // this is an invoke for an atomic action logging.info(this, s"sequence invoking an enclosed atomic action $action") - val timeout = action.limits.timeout.duration + blockingInvokeGrace - invokeSingleAction(user, action, inputPayload, timeout, blocking = true, cause) map { - case (activationId, wskActivation) => (activationId, wskActivation, accounting.atomicActionCnt + 1) + val timeout = action.limits.timeout.duration + 1.minute + invokeAction(user, action, inputPayload, waitForResponse = Some(timeout), cause) map { + case res => (res, accounting.atomicActionCnt + 1) } } futureWhiskActivationTuple.map { - case (activationId, wskActivation, atomicActionCountSoFar) => - wskActivation.map { - activation => accounting.maybe(activation, atomicActionCountSoFar, actionSequenceLimit) - }.getOrElse { - // the wskActivation is None only if the result could not be retrieved in time either from active ack or from db - logging.error(this, s"component activation timedout for $activationId") - val activationResponse = ActivationResponse.whiskError(sequenceRetrieveActivationTimeout(activationId)) - accounting.fail(activationResponse, Some(activationId)) - } + case (Right(activation), atomicActionCountSoFar) => + accounting.maybe(activation, atomicActionCountSoFar, actionSequenceLimit) + + case (Left(activationId), atomicActionCountSoFar) => + // the result could not be retrieved in time either from active ack or from db + logging.error(this, s"component activation timedout for $activationId") + val activationResponse = ActivationResponse.whiskError(sequenceRetrieveActivationTimeout(activationId)) + accounting.fail(activationResponse, Some(activationId)) + }.recover { // check any failure here and generate an activation response to encapsulate // the failure mode; consider this failure a whisk error diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala index 636f157..5390a3a 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala @@ -25,9 +25,7 @@ import scala.concurrent.Await import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.Promise -import scala.concurrent.TimeoutException import scala.concurrent.duration.DurationInt -import scala.concurrent.duration.FiniteDuration import scala.util.Failure import scala.util.Success @@ -56,6 +54,8 @@ import scala.annotation.tailrec trait LoadBalancer { + val activeAckTimeoutGrace = 1.minute + /** * Retrieves a per subject map of counts representing in-flight activations as seen by the load balancer * @@ -64,16 +64,18 @@ trait LoadBalancer { def getActiveUserActivationCounts: Map[String, Int] /** - * Publishes activation message on internal bus for the invoker to pick up. + * Publishes activation message on internal bus for an invoker to pick up. * + * @param action the action to invoke * @param msg the activation message to publish on an invoker topic - * @param timeout the desired active ack timeout * @param transid the transaction id for the request * @return result a nested Future the outer indicating completion of publishing and * the inner the completion of the action (i.e., the result) - * if it is ready before timeout otherwise the future fails with ActiveAckTimeout + * if it is ready before timeout (Right) otherwise the activation id (Left). + * The future is guaranteed to complete within the declared action time limit + * plus a grace period (see activeAckTimeoutGrace). */ - def publish(action: WhiskAction, msg: ActivationMessage, timeout: FiniteDuration)(implicit transid: TransactionId): Future[Future[WhiskActivation]] + def publish(action: WhiskAction, msg: ActivationMessage)(implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] } @@ -88,11 +90,11 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore override def getActiveUserActivationCounts: Map[String, Int] = activationBySubject.toMap mapValues { _.size } - override def publish(action: WhiskAction, msg: ActivationMessage, timeout: FiniteDuration)( - implicit transid: TransactionId): Future[Future[WhiskActivation]] = { + override def publish(action: WhiskAction, msg: ActivationMessage)( + implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = { chooseInvoker(action, msg).flatMap { invokerName => val subject = msg.user.subject.asString - val entry = setupActivation(msg.activationId, subject, invokerName, timeout, transid) + val entry = setupActivation(action, msg.activationId, subject, invokerName, transid) sendActivationToInvoker(messageProducer, msg, invokerName).map { _ => entry.promise.future } @@ -105,7 +107,7 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore * A map storing current activations based on ActivationId. * The promise value represents the obligation of writing the answer back. */ - case class ActivationEntry(id: ActivationId, subject: String, invokerName: String, created: Instant, promise: Promise[WhiskActivation]) + case class ActivationEntry(id: ActivationId, subject: String, invokerName: String, created: Instant, promise: Promise[Either[ActivationId, WhiskActivation]]) type TrieSet[T] = TrieMap[T, Unit] private val activationById = new TrieMap[ActivationId, ActivationEntry] private val activationByInvoker = new TrieMap[String, TrieSet[ActivationEntry]] @@ -117,42 +119,44 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore * * @param msg is the kafka message payload as Json */ - private def processCompletion(msg: CompletionMessage) = { - implicit val tid = msg.transid - val aid = msg.response.activationId - logging.info(this, s"received active ack for '$aid'") - val response = msg.response + private def processCompletion(response: Either[ActivationId, WhiskActivation], tid: TransactionId, forced: Boolean): Unit = { + val aid = response.fold(l => l, r => r.activationId) activationById.remove(aid) match { case Some(entry @ ActivationEntry(_, subject, invokerIndex, _, p)) => + logging.info(this, s"${if (!forced) "received" else "forced"} active ack for '$aid'")(tid) activationByInvoker.getOrElseUpdate(invokerIndex, new TrieSet[ActivationEntry]).remove(entry) activationBySubject.getOrElseUpdate(subject, new TrieSet[ActivationEntry]).remove(entry) - p.trySuccess(response) - logging.info(this, s"processed active response for '$aid'") + if (!forced) { + p.trySuccess(response) + } else { + p.tryFailure(new Throwable("no active ack received")) + } + case None => - logging.warn(this, s"processed active response for '$aid' which has no entry") + // the entry was already removed + logging.debug(this, s"received active ack for '$aid' which has no entry")(tid) } } /** * Creates an activation entry and insert into various maps. */ - private def setupActivation(activationId: ActivationId, subject: String, invokerName: String, timeout: FiniteDuration, transid: TransactionId): ActivationEntry = { + private def setupActivation(action: WhiskAction, activationId: ActivationId, subject: String, invokerName: String, transid: TransactionId): ActivationEntry = { // either create a new promise or reuse a previous one for this activation if it exists + val timeout = action.limits.timeout.duration + activeAckTimeoutGrace val entry = activationById.getOrElseUpdate(activationId, { - - // install a timeout handler; this is the handler for "the action took longer than ActiveAckTimeout" - // note the use of WeakReferences; this is to avoid the handler's closure holding on to the - // WhiskActivation, which in turn holds on to the full JsObject of the response - // NOTE: we do not remove the entry from the maps, as this is done only by processCompletion - val promiseRef = new java.lang.ref.WeakReference(Promise[WhiskActivation]) + val promiseRef = new java.lang.ref.WeakReference(Promise[Either[ActivationId, WhiskActivation]]) + + // Install a timeout handler for the catastrophic case where an active ack is not received at all + // (because say an invoker is down completely, or the connection to the message bus is disrupted) or when + // the active ack is significantly delayed (possibly dues to long queues but the subject should not be penalized); + // in this case, if the activation handler is still registered, remove it and update the books. + // Note the use of WeakReferences; this is to avoid the handler's closure holding on to the + // WhiskActivation, which in turn holds on to the full JsObject of the response. actorSystem.scheduler.scheduleOnce(timeout) { - activationById.get(activationId).foreach { _ => - val p = promiseRef.get - if (p != null && p.tryFailure(new ActiveAckTimeout(activationId))) { - logging.info(this, "active response timed out")(transid) - } - } + processCompletion(Left(activationId), transid, forced = true) } + ActivationEntry(activationId, subject, invokerName, Instant.now(Clock.systemUTC()), promiseRef.get) }) @@ -238,8 +242,10 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore val raw = new String(bytes, StandardCharsets.UTF_8) CompletionMessage.parse(raw) match { case Success(m: CompletionMessage) => - processCompletion(m) - invokerPool ! InvocationFinishedMessage(m.invoker, !m.response.response.isWhiskError) + processCompletion(m.response, m.transid, false) + // treat left as success (as it is the result a the message exceeding the bus limit) + val isSuccess = m.response.fold(l => true, r => !r.response.isWhiskError) + invokerPool ! InvocationFinishedMessage(m.invoker, isSuccess) case Failure(t) => logging.error(this, s"failed processing message: $raw with $t") } @@ -384,5 +390,4 @@ object LoadBalancerService { } -private case class ActiveAckTimeout(activationId: ActivationId) extends TimeoutException private case class LoadBalancerException(msg: String) extends Throwable(msg) diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala index 813cc6b..a655023 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala @@ -47,6 +47,11 @@ import whisk.core.entity._ import whisk.http.BasicHttpService import whisk.http.Messages import whisk.utils.ExecutionContextFactory +import whisk.common.Scheduler +import whisk.core.connector.PingMessage +import scala.util.Try +import whisk.core.connector.MessageProducer +import org.apache.kafka.common.errors.RecordTooLargeException /** * A kafka message handler that invokes actions as directed by message on topic "/actions/invoke". @@ -92,7 +97,7 @@ class Invoker( // hence when the transaction is fully processed, this method will complete a promise with the datastore // future writing back the activation record and for which there are three cases: // 1. success: there were no exceptions and hence the invoke path operated normally, - // 2. error during invocation: an exception occurred while trying to run the action, + // 2. error during invocation: an exception occurred while trying to run the action (failed to bring up a container for example), // 3. error fetching action: an exception occurred reading from the db, didn't get to run. val transactionPromise = Promise[DocInfo] @@ -102,6 +107,7 @@ class Invoker( if (actionid.rev == DocRevision.empty) { logging.error(this, s"revision was not provided for ${actionid.id}") } + WhiskAction.get(entityStore, actionid.id, actionid.rev, fromCache = actionid.rev != DocRevision.empty) onComplete { case Success(action) => // only Exec instances that are subtypes of CodeExec reach the invoker @@ -143,8 +149,12 @@ class Invoker( implicit transid: TransactionId): Future[DocInfo] = { val msg = tran.msg val interval = computeActivationInterval(tran) - val activation = makeWhiskActivation(msg, EntityPath(name.id), version, response, interval, limits) - completeTransaction(tran, activation, FailedActivation(transid)) + val activationResult = makeWhiskActivation(msg, EntityPath(name.id), version, response, interval, limits) + + // send activate ack for failed activations + sendActiveAck(tran, activationResult) + + completeTransaction(tran, activationResult, FailedActivation(transid)) } /* @@ -189,7 +199,8 @@ class Invoker( } map { case (failedInit, con, result) => // process the result and send active ack message - val activationResult = sendActiveAck(tran, action, failedInit, result) + val activationResult = makeActivationResultForSuccess(tran, action, failedInit, result) + sendActiveAck(tran, activationResult) // after sending active ack, drain logs and return container val contents = getContainerLogs(con, action.exec.asInstanceOf[CodeExec[_]].sentinelledLogs, action.limits.logs) @@ -241,26 +252,40 @@ class Invoker( } /** - * Creates WhiskActivation for the "run result" (which could be a failed initialization) and send - * ActiveAck message. + * Creates WhiskActivation for the "run result" (which could be a failed initialization); this + * method is only reached if the action actually ran with no invoker exceptions). * * @return WhiskActivation */ - private def sendActiveAck(tran: Transaction, action: WhiskAction, failedInit: Boolean, result: RunResult)( + private def makeActivationResultForSuccess(tran: Transaction, action: WhiskAction, failedInit: Boolean, result: RunResult)( implicit transid: TransactionId): WhiskActivation = { if (!failedInit) tran.runInterval = Some(result.interval) val msg = tran.msg val activationInterval = computeActivationInterval(tran) val activationResponse = getActivationResponse(activationInterval, action.limits.timeout.duration, result, failedInit) - val activationResult = makeWhiskActivation(msg, EntityPath(action.fullyQualifiedName(false).toString), action.version, activationResponse, activationInterval, Some(action.limits)) - val completeMsg = CompletionMessage(transid, activationResult, this.name) + makeWhiskActivation(msg, EntityPath(action.fullyQualifiedName(false).toString), action.version, activationResponse, activationInterval, Some(action.limits)) + } - producer.send(s"completed${msg.rootControllerIndex.toInt}", completeMsg) map { status => - logging.info(this, s"posted completion of activation ${msg.activationId}") + /** + * Sends ActiveAck message for a completed activation. + * If for some reason posting to the message bus fails, an active ack may not be sent. + */ + private def sendActiveAck(tran: Transaction, activationResult: WhiskActivation)( + implicit transid: TransactionId): Unit = { + + def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = false) = { + val msg = CompletionMessage(transid, res, this.name) + producer.send(s"completed${tran.msg.rootControllerIndex.toInt}", msg).andThen { + case Success(_) => + logging.info(this, s"posted ${if (recovery) "recovery" else ""} completion of activation ${activationResult.activationId}") + } } - activationResult + send(Right(activationResult)).onFailure { + case t if t.getCause.isInstanceOf[RecordTooLargeException] => + send(Left(activationResult.activationId), recovery = true) + } } // The nodeJsAction runner inserts this line in the logs at the end diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala index b90db49..d620e3d 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala @@ -23,6 +23,8 @@ import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success +import org.apache.kafka.common.errors.RecordTooLargeException + import akka.actor.ActorRef import akka.actor.ActorRefFactory import akka.actor.ActorSystem @@ -107,10 +109,20 @@ class InvokerReactive( } /** Sends an active-ack. */ - val ack = (tid: TransactionId, activation: WhiskActivation, controllerInstance: InstanceId) => { + val ack = (tid: TransactionId, activationResult: WhiskActivation, controllerInstance: InstanceId) => { implicit val transid = tid - producer.send(s"completed${controllerInstance.toInt}", CompletionMessage(tid, activation, s"invoker${instance.toInt}")).andThen { - case Success(_) => logging.info(this, s"posted completion of activation ${activation.activationId}") + + def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = false) = { + val msg = CompletionMessage(transid, res, this.name) + producer.send(s"completed${controllerInstance.toInt}", msg).andThen { + case Success(_) => + logging.info(this, s"posted ${if (recovery) "recovery" else ""} completion of activation ${activationResult.activationId}") + } + } + + send(Right(activationResult)).recoverWith { + case t if t.getCause.isInstanceOf[RecordTooLargeException] => + send(Left(activationResult.activationId), recovery = true) } } diff --git a/tests/src/test/scala/whisk/common/SchedulerTests.scala b/tests/src/test/scala/whisk/common/SchedulerTests.scala index 1e3df96..cc2a56b 100644 --- a/tests/src/test/scala/whisk/common/SchedulerTests.scala +++ b/tests/src/test/scala/whisk/common/SchedulerTests.scala @@ -117,6 +117,21 @@ class SchedulerTests callCount shouldBe 1 } + it should "log scheduler halt message with tid" in { + implicit val transid = TransactionId.testing + val msg = "test threw an exception" + + stream.reset() + val scheduled = Scheduler.scheduleWaitAtLeast(timeBetweenCalls) { () => + throw new Exception(msg) + } + + waitForCalls() + stream.toString.split(" ").drop(1).mkString(" ") shouldBe { + s"[ERROR] [#sid_1] [Scheduler] halted because $msg\n" + } + } + it should "not stop the scheduler if the future from the closure is failed" in { var callCount = 0 @@ -146,10 +161,54 @@ class SchedulerTests val differences = calculateDifferences(calls) withClue(s"expecting all $differences to be <= $timeBetweenCalls") { + differences should not be 'empty differences.forall(_ <= timeBetweenCalls + schedulerSlack) } } + it should "delay initial schedule by given duration" in { + val timeBetweenCalls = 200 milliseconds + val initialDelay = 1.second + var callCount = 0 + + val scheduled = Scheduler.scheduleWaitAtMost(timeBetweenCalls, initialDelay) { () => + callCount += 1 + Future successful true + } + + try { + Thread.sleep(initialDelay.toMillis) + callCount should be <= 1 + + Thread.sleep(2 * timeBetweenCalls.toMillis) + callCount should be > 1 + } finally { + scheduled ! PoisonPill + } + } + + it should "perform work immediately when requested" in { + val timeBetweenCalls = 200 milliseconds + val initialDelay = 1.second + var callCount = 0 + + val scheduled = Scheduler.scheduleWaitAtMost(timeBetweenCalls, initialDelay) { () => + callCount += 1 + Future successful true + } + + try { + Thread.sleep(2 * timeBetweenCalls.toMillis) + callCount should be(0) + + scheduled ! Scheduler.WorkOnceNow + Thread.sleep(timeBetweenCalls.toMillis) + callCount should be(1) + } finally { + scheduled ! PoisonPill + } + } + it should "not wait when the closure takes longer than the interval" in { val calls = Buffer[Instant]() val timeBetweenCalls = 200 milliseconds @@ -165,6 +224,7 @@ class SchedulerTests val differences = calculateDifferences(calls) withClue(s"expecting all $differences to be <= $computationTime") { + differences should not be 'empty differences.forall(_ <= computationTime + schedulerSlack) } } diff --git a/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala b/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala new file mode 100644 index 0000000..01f0282 --- /dev/null +++ b/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.connector.tests + +import java.time.Instant + +import scala.util.Success +import scala.concurrent.duration.DurationInt + +import org.junit.runner.RunWith +import org.scalatest.FlatSpec +import org.scalatest.Matchers +import org.scalatest.junit.JUnitRunner + +import spray.json.DefaultJsonProtocol.StringJsonFormat +import spray.json._ +import whisk.common.TransactionId +import whisk.core.connector.CompletionMessage +import whisk.core.entity._ +import whisk.core.entity.size.SizeInt + +/** + * Unit tests for the CompletionMessage object. + */ +@RunWith(classOf[JUnitRunner]) +class CompletionMessageTests extends FlatSpec with Matchers { + + behavior of "completion message" + + val activation = WhiskActivation( + namespace = EntityPath("ns"), + name = EntityName("a"), + Subject(), + activationId = ActivationId(), + start = Instant.now(), + end = Instant.now(), + response = ActivationResponse.success(Some(JsObject("res" -> JsNumber(1)))), + annotations = Parameters("limits", ActionLimits( + TimeLimit(1.second), + MemoryLimit(128.MB), + LogLimit(1.MB)).toJson), + duration = Some(123)) + + it should "serialize a left completion message" in { + val m = CompletionMessage(TransactionId.testing, Left(ActivationId()), "xyz") + m.serialize shouldBe JsObject( + "transid" -> m.transid.toJson, + "response" -> m.response.left.get.toJson, + "invoker" -> m.invoker.toJson).compactPrint + } + + it should "serialize a right completion message" in { + val m = CompletionMessage(TransactionId.testing, Right(activation), "xyz") + m.serialize shouldBe JsObject( + "transid" -> m.transid.toJson, + "response" -> m.response.right.get.toJson, + "invoker" -> m.invoker.toJson).compactPrint + } + + it should "deserialize a left completion message" in { + val m = CompletionMessage(TransactionId.testing, Left(ActivationId()), "xyz") + CompletionMessage.parse(m.serialize) shouldBe Success(m) + } + + it should "deserialize a right completion message" in { + val m = CompletionMessage(TransactionId.testing, Right(activation), "xyz") + CompletionMessage.parse(m.serialize) shouldBe Success(m) + } +} diff --git a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala index 8d8dc1d..4cc0590 100644 --- a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala +++ b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala @@ -201,7 +201,7 @@ class DegenerateLoadBalancerService(config: WhiskConfig)(implicit ec: ExecutionC override def getActiveUserActivationCounts: Map[String, Int] = Map() - override def publish(action: WhiskAction, msg: ActivationMessage, timeout: FiniteDuration)(implicit transid: TransactionId): Future[Future[WhiskActivation]] = + override def publish(action: WhiskAction, msg: ActivationMessage)(implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = Future.successful { whiskActivationStub map { case (timeout, activation) => Future { @@ -210,7 +210,7 @@ class DegenerateLoadBalancerService(config: WhiskConfig)(implicit ec: ExecutionC Thread.sleep(timeout.toMillis) println(".... done waiting") } - activation + Right(activation) } } getOrElse Future.failed(new IllegalArgumentException("Unit test does not need fast path")) } diff --git a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala index 1d3e932..9a4dc92 100644 --- a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala +++ b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala @@ -260,8 +260,13 @@ trait WebActionsApiTests extends ControllerTestCommon with BeforeAndAfterEach wi } } - override protected[controller] def invokeAction(user: Identity, action: WhiskAction, payload: Option[JsObject], blocking: Boolean, waitOverride: Option[FiniteDuration] = None)( - implicit transid: TransactionId): Future[(ActivationId, Option[WhiskActivation])] = { + override protected[controller] def invokeAction( + user: Identity, + action: WhiskAction, + payload: Option[JsObject], + waitForResponse: Option[FiniteDuration], + cause: Option[ActivationId])( + implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = { invocationCount = invocationCount + 1 if (failActivation == 0) { @@ -304,9 +309,9 @@ trait WebActionsApiTests extends ControllerTestCommon with BeforeAndAfterEach wi } action.parameters.get("z") shouldBe defaultActionParameters.get("z") - Future.successful(activation.activationId, Some(activation)) + Future.successful(Right(activation)) } else if (failActivation == 1) { - Future.successful(ActivationId(), None) + Future.successful(Left(ActivationId())) } else { Future.failed(new IllegalStateException("bad activation")) } diff --git a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala index 5784970..ee43dbe 100644 --- a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala +++ b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala @@ -120,27 +120,39 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers { } } - it should "succeed but truncate result, if result exceeds its limit" in withAssetCleaner(wskprops) { - (wp, assetHelper) => - val name = "TestActionCausingExcessiveResult" - assetHelper.withCleaner(wsk.action, name) { - val actionName = TestUtils.getTestActionFilename("sizedResult.js") - (action, _) => action.create(name, Some(actionName)) - } + Seq(true, false).foreach { blocking => + it should s"succeed but truncate result, if result exceeds its limit (blocking: $blocking)" in withAssetCleaner(wskprops) { + (wp, assetHelper) => + val name = "TestActionCausingExcessiveResult" + assetHelper.withCleaner(wsk.action, name) { + val actionName = TestUtils.getTestActionFilename("sizedResult.js") + (action, _) => action.create(name, Some(actionName), timeout = Some(15.seconds)) + } - val allowedSize = ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT.toBytes - val run = wsk.action.invoke(name, Map("size" -> (allowedSize + 1).toJson, "char" -> "a".toJson)) - withActivation(wsk.activation, run) { activation => - val response = activation.response - response.success shouldBe false - response.status shouldBe ActivationResponse.messageForCode(ActivationResponse.ContainerError) - val msg = response.result.get.fields(ActivationResponse.ERROR_FIELD).convertTo[String] - val expected = Messages.truncatedResponse((allowedSize + 10).B, allowedSize.B) - withClue(s"is: ${msg.take(expected.length)}\nexpected: $expected") { - msg.startsWith(expected) shouldBe true + val allowedSize = ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT.toBytes + + def checkResponse(activation: CliActivation) = { + val response = activation.response + response.success shouldBe false + response.status shouldBe ActivationResponse.messageForCode(ActivationResponse.ContainerError) + val msg = response.result.get.fields(ActivationResponse.ERROR_FIELD).convertTo[String] + val expected = Messages.truncatedResponse((allowedSize + 10).B, allowedSize.B) + withClue(s"is: ${msg.take(expected.length)}\nexpected: $expected") { + msg.startsWith(expected) shouldBe true + } + msg.endsWith("a") shouldBe true } - msg.endsWith("a") shouldBe true - } + + // this tests an active ack failure to post from invoker + val args = Map("size" -> (allowedSize + 1).toJson, "char" -> "a".toJson) + val code = if (blocking) TestUtils.APP_ERROR else TestUtils.SUCCESS_EXIT + val rr = wsk.action.invoke(name, args, blocking = blocking, expectedExitCode = code) + if (blocking) { + checkResponse(wsk.parseJsonString(rr.stderr).convertTo[CliActivation]) + } else { + withActivation(wsk.activation, rr) { checkResponse(_) } + } + } } it should "succeed with one log line" in withAssetCleaner(wskprops) { @@ -245,7 +257,7 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers { (action, _) => action.create(name, Some(actionName), memory = Some(allowedMemory)) } - for( a <- 1 to 10){ + for (a <- 1 to 10) { val run = wsk.action.invoke(name, Map("payload" -> "128".toJson)) withActivation(wsk.activation, run) { response => response.response.status shouldBe "success" -- To stop receiving notification emails like this one, please contact ['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].