This is an automated email from the ASF dual-hosted git repository. cbickel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new c69b6f5 Send active-ack in any case of a parseable message. (#3424) c69b6f5 is described below commit c69b6f5488122705ef75c42e0a0c82ab65c7075c Author: Markus Thömmes <markusthoem...@me.com> AuthorDate: Tue Mar 13 13:20:50 2018 +0100 Send active-ack in any case of a parseable message. (#3424) --- .../src/main/scala/whisk/http/ErrorResponse.scala | 2 + .../scala/whisk/core/invoker/InvokerReactive.scala | 193 +++++++++++---------- .../whisk/core/invoker/NamespaceBlacklist.scala | 3 - 3 files changed, 101 insertions(+), 97 deletions(-) diff --git a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala index 0f466d3..97d2008 100644 --- a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala +++ b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala @@ -205,6 +205,8 @@ object Messages { } } + val namespacesBlacklisted = "The action was not invoked due to a blacklisted namespace." + val actionRemovedWhileInvoking = "Action could not be found or may have been deleted." val actionMismatchWhileInvoking = "Action version is not compatible and cannot be invoked." val actionFetchErrorWhileInvoking = "Action could not be fetched." diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala index afe0c89..0729103 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala @@ -37,7 +37,7 @@ import whisk.core.entity.size._ import whisk.http.Messages import whisk.spi.SpiLoader -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import scala.util.{Failure, Success} @@ -46,8 +46,8 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa logging: Logging) { implicit val materializer: ActorMaterializer = ActorMaterializer() - implicit val ec = actorSystem.dispatcher - implicit val cfg = config + implicit val ec: ExecutionContext = actorSystem.dispatcher + implicit val cfg: WhiskConfig = config private val logsProvider = SpiLoader.get[LogStoreProvider].logStore(actorSystem) logging.info(this, s"LogStoreProvider: ${logsProvider.getClass}") @@ -59,7 +59,7 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa * task or actor because further operation does not make sense if something * goes wrong here. Initialization will throw an exception upon failure. */ - val containerFactory = + private val containerFactory = SpiLoader .get[ContainerFactoryProvider] .getContainerFactory( @@ -90,26 +90,26 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa } /** Initialize message consumers */ - val topic = s"invoker${instance.toInt}" - val maximumContainers = config.invokerNumCore.toInt * config.invokerCoreShare.toInt - val msgProvider = SpiLoader.get[MessagingProvider] - val consumer = msgProvider.getConsumer( + private val topic = s"invoker${instance.toInt}" + private val maximumContainers = config.invokerNumCore.toInt * config.invokerCoreShare.toInt + private val msgProvider = SpiLoader.get[MessagingProvider] + private val consumer = msgProvider.getConsumer( config, topic, topic, maximumContainers, maxPollInterval = TimeLimit.MAX_DURATION + 1.minute) - val activationFeed = actorSystem.actorOf(Props { + private val activationFeed = actorSystem.actorOf(Props { new MessageFeed("activation", logging, consumer, maximumContainers, 500.milliseconds, processActivationMessage) }) /** Sends an active-ack. */ - val ack = (tid: TransactionId, - activationResult: WhiskActivation, - blockingInvoke: Boolean, - controllerInstance: InstanceId) => { - implicit val transid = tid + private val ack = (tid: TransactionId, + activationResult: WhiskActivation, + blockingInvoke: Boolean, + controllerInstance: InstanceId) => { + implicit val transid: TransactionId = tid def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = false) = { val msg = CompletionMessage(transid, res, instance) @@ -129,8 +129,8 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa } /** Stores an activation in the database. */ - val store = (tid: TransactionId, activation: WhiskActivation) => { - implicit val transid = tid + private val store = (tid: TransactionId, activation: WhiskActivation) => { + implicit val transid: TransactionId = tid logging.debug(this, "recording the activation result to the data store") WhiskActivation.put(activationStore, activation)(tid, notifier = None).andThen { case Success(id) => logging.debug(this, s"recorded activation") @@ -139,18 +139,16 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa } /** Creates a ContainerProxy Actor when being called. */ - val childFactory = (f: ActorRefFactory) => + private val childFactory = (f: ActorRefFactory) => f.actorOf(ContainerProxy.props(containerFactory.createContainer, ack, store, logsProvider.collectLogs, instance)) - val prewarmKind = "nodejs:6" - val prewarmExec = ExecManifest.runtimesManifest + private val prewarmKind = "nodejs:6" + private val prewarmExec = ExecManifest.runtimesManifest .resolveDefaultRuntime(prewarmKind) - .map { manifest => - new CodeExecAsString(manifest, "", None) - } + .map(manifest => CodeExecAsString(manifest, "", None)) .get - val pool = actorSystem.actorOf( + private val pool = actorSystem.actorOf( ContainerPool.props( childFactory, maximumContainers, @@ -163,92 +161,99 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa Future(ActivationMessage.parse(new String(bytes, StandardCharsets.UTF_8))) .flatMap(Future.fromTry) .flatMap { msg => + // The message has been parsed correctly, thus the following code needs to *always* produce at least an + // active-ack. + + implicit val transid: TransactionId = msg.transid + if (!namespaceBlacklist.isBlacklisted(msg.user)) { - Future.successful(msg) - } else { - Future.failed(NamespaceBlacklistedException(msg.user.namespace.name)) - } - } - .filter(_.action.version.isDefined) - .flatMap { msg => - implicit val transid = msg.transid + val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION, logLevel = InfoLevel) + val namespace = msg.action.path + val name = msg.action.name + val actionid = FullyQualifiedEntityName(namespace, name).toDocId.asDocInfo(msg.revision) + val subject = msg.user.subject - val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION, logLevel = InfoLevel) - val namespace = msg.action.path - val name = msg.action.name - val actionid = FullyQualifiedEntityName(namespace, name).toDocId.asDocInfo(msg.revision) - val subject = msg.user.subject + logging.debug(this, s"${actionid.id} $subject ${msg.activationId}") - logging.debug(this, s"${actionid.id} $subject ${msg.activationId}") + // caching is enabled since actions have revision id and an updated + // action will not hit in the cache due to change in the revision id; + // if the doc revision is missing, then bypass cache + if (actionid.rev == DocRevision.empty) logging.warn(this, s"revision was not provided for ${actionid.id}") - // caching is enabled since actions have revision id and an updated - // action will not hit in the cache due to change in the revision id; - // if the doc revision is missing, then bypass cache - if (actionid.rev == DocRevision.empty) { - logging.warn(this, s"revision was not provided for ${actionid.id}") - } + WhiskAction + .get(entityStore, actionid.id, actionid.rev, fromCache = actionid.rev != DocRevision.empty) + .flatMap { action => + action.toExecutableWhiskAction match { + case Some(executable) => + pool ! Run(executable, msg) + Future.successful(()) + case None => + logging.error(this, s"non-executable action reached the invoker ${action.fullyQualifiedName(false)}") + Future.failed(new IllegalStateException("non-executable action reached the invoker")) + } + } + .recoverWith { + case t => + // If the action cannot be found, the user has concurrently deleted it, + // making this an application error. All other errors are considered system + // errors and should cause the invoker to be considered unhealthy. + val response = t match { + case _: NoDocumentException => + ActivationResponse.applicationError(Messages.actionRemovedWhileInvoking) + case _: DocumentTypeMismatchException | _: DocumentUnreadable => + ActivationResponse.whiskError(Messages.actionMismatchWhileInvoking) + case _ => + ActivationResponse.whiskError(Messages.actionFetchErrorWhileInvoking) + } - WhiskAction - .get(entityStore, actionid.id, actionid.rev, fromCache = actionid.rev != DocRevision.empty) - .flatMap { action => - action.toExecutableWhiskAction match { - case Some(executable) => - pool ! Run(executable, msg) + val activation = generateFallbackActivation(msg, response) + activationFeed ! MessageFeed.Processed + ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex) + store(msg.transid, activation) Future.successful(()) - case None => - logging.error(this, s"non-executable action reached the invoker ${action.fullyQualifiedName(false)}") - Future.failed(new IllegalStateException("non-executable action reached the invoker")) } - } - .recoverWith { - case t => - // If the action cannot be found, the user has concurrently deleted it, - // making this an application error. All other errors are considered system - // errors and should cause the invoker to be considered unhealthy. - val response = t match { - case _: NoDocumentException => - ActivationResponse.applicationError(Messages.actionRemovedWhileInvoking) - case _: DocumentTypeMismatchException | _: DocumentUnreadable => - ActivationResponse.whiskError(Messages.actionMismatchWhileInvoking) - case _ => - ActivationResponse.whiskError(Messages.actionFetchErrorWhileInvoking) - } - val now = Instant.now - val causedBy = if (msg.causedBySequence) { - Some(Parameters(WhiskActivation.causedByAnnotation, JsString(Exec.SEQUENCE))) - } else None - val activation = WhiskActivation( - activationId = msg.activationId, - namespace = msg.user.namespace.toPath, - subject = msg.user.subject, - cause = msg.cause, - name = msg.action.name, - version = msg.action.version.getOrElse(SemVer()), - start = now, - end = now, - duration = Some(0), - response = response, - annotations = { - Parameters(WhiskActivation.pathAnnotation, JsString(msg.action.asString)) ++ causedBy - }) - - activationFeed ! MessageFeed.Processed - ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex) - store(msg.transid, activation) - Future.successful(()) - } + } else { + // Iff the current namespace is blacklisted, an active-ack is only produced to keep the loadbalancer protocol + // Due to the protective nature of the blacklist, a database entry is not written. + activationFeed ! MessageFeed.Processed + val activation = + generateFallbackActivation(msg, ActivationResponse.applicationError(Messages.namespacesBlacklisted)) + ack(msg.transid, activation, false, msg.rootControllerIndex) + logging.warn(this, s"namespace ${msg.user.namespace} was blocked in invoker.") + Future.successful(()) + } } .recoverWith { case t => // Iff everything above failed, we have a terminal error at hand. Either the message failed // to deserialize, or something threw an error where it is not expected to throw. activationFeed ! MessageFeed.Processed - t match { - case nse: NamespaceBlacklistedException => logging.warn(this, nse.getMessage) - case _ => logging.error(this, s"terminal failure while processing message: $t") - } + logging.error(this, s"terminal failure while processing message: $t") Future.successful(()) } } + /** Generates an activation with zero runtime. Usually used for error cases */ + private def generateFallbackActivation(msg: ActivationMessage, response: ActivationResponse): WhiskActivation = { + val now = Instant.now + val causedBy = if (msg.causedBySequence) { + Some(Parameters(WhiskActivation.causedByAnnotation, JsString(Exec.SEQUENCE))) + } else None + + WhiskActivation( + activationId = msg.activationId, + namespace = msg.user.namespace.toPath, + subject = msg.user.subject, + cause = msg.cause, + name = msg.action.name, + version = msg.action.version.getOrElse(SemVer()), + start = now, + end = now, + duration = Some(0), + response = response, + annotations = { + Parameters(WhiskActivation.pathAnnotation, JsString(msg.action.asString)) ++ causedBy + }) + } + } diff --git a/core/invoker/src/main/scala/whisk/core/invoker/NamespaceBlacklist.scala b/core/invoker/src/main/scala/whisk/core/invoker/NamespaceBlacklist.scala index 9909c82..4f4336e 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/NamespaceBlacklist.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/NamespaceBlacklist.scala @@ -73,6 +73,3 @@ object NamespaceBlacklist { /** Configuration relevant to the namespace blacklist */ case class NamespaceBlacklistConfig(pollInterval: FiniteDuration) - -/** Indicates the activation was stopped due to a blacklisted identity */ -case class NamespaceBlacklistedException(ns: String) extends Exception(s"Namespace $ns was blocked in invoker.") -- To stop receiving notification emails like this one, please contact cbic...@apache.org.