This is an automated email from the ASF dual-hosted git repository.

cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new c69b6f5  Send active-ack in any case of a parseable message. (#3424)
c69b6f5 is described below

commit c69b6f5488122705ef75c42e0a0c82ab65c7075c
Author: Markus Thömmes <markusthoem...@me.com>
AuthorDate: Tue Mar 13 13:20:50 2018 +0100

    Send active-ack in any case of a parseable message. (#3424)
---
 .../src/main/scala/whisk/http/ErrorResponse.scala  |   2 +
 .../scala/whisk/core/invoker/InvokerReactive.scala | 193 +++++++++++----------
 .../whisk/core/invoker/NamespaceBlacklist.scala    |   3 -
 3 files changed, 101 insertions(+), 97 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala 
b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
index 0f466d3..97d2008 100644
--- a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
+++ b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
@@ -205,6 +205,8 @@ object Messages {
     }
   }
 
+  val namespacesBlacklisted = "The action was not invoked due to a blacklisted 
namespace."
+
   val actionRemovedWhileInvoking = "Action could not be found or may have been 
deleted."
   val actionMismatchWhileInvoking = "Action version is not compatible and 
cannot be invoked."
   val actionFetchErrorWhileInvoking = "Action could not be fetched."
diff --git 
a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala 
b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index afe0c89..0729103 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -37,7 +37,7 @@ import whisk.core.entity.size._
 import whisk.http.Messages
 import whisk.spi.SpiLoader
 
-import scala.concurrent.Future
+import scala.concurrent.{ExecutionContext, Future}
 import scala.concurrent.duration._
 import scala.util.{Failure, Success}
 
@@ -46,8 +46,8 @@ class InvokerReactive(config: WhiskConfig, instance: 
InstanceId, producer: Messa
   logging: Logging) {
 
   implicit val materializer: ActorMaterializer = ActorMaterializer()
-  implicit val ec = actorSystem.dispatcher
-  implicit val cfg = config
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+  implicit val cfg: WhiskConfig = config
 
   private val logsProvider = 
SpiLoader.get[LogStoreProvider].logStore(actorSystem)
   logging.info(this, s"LogStoreProvider: ${logsProvider.getClass}")
@@ -59,7 +59,7 @@ class InvokerReactive(config: WhiskConfig, instance: 
InstanceId, producer: Messa
    * task or actor because further operation does not make sense if something
    * goes wrong here. Initialization will throw an exception upon failure.
    */
-  val containerFactory =
+  private val containerFactory =
     SpiLoader
       .get[ContainerFactoryProvider]
       .getContainerFactory(
@@ -90,26 +90,26 @@ class InvokerReactive(config: WhiskConfig, instance: 
InstanceId, producer: Messa
   }
 
   /** Initialize message consumers */
-  val topic = s"invoker${instance.toInt}"
-  val maximumContainers = config.invokerNumCore.toInt * 
config.invokerCoreShare.toInt
-  val msgProvider = SpiLoader.get[MessagingProvider]
-  val consumer = msgProvider.getConsumer(
+  private val topic = s"invoker${instance.toInt}"
+  private val maximumContainers = config.invokerNumCore.toInt * 
config.invokerCoreShare.toInt
+  private val msgProvider = SpiLoader.get[MessagingProvider]
+  private val consumer = msgProvider.getConsumer(
     config,
     topic,
     topic,
     maximumContainers,
     maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
 
-  val activationFeed = actorSystem.actorOf(Props {
+  private val activationFeed = actorSystem.actorOf(Props {
     new MessageFeed("activation", logging, consumer, maximumContainers, 
500.milliseconds, processActivationMessage)
   })
 
   /** Sends an active-ack. */
-  val ack = (tid: TransactionId,
-             activationResult: WhiskActivation,
-             blockingInvoke: Boolean,
-             controllerInstance: InstanceId) => {
-    implicit val transid = tid
+  private val ack = (tid: TransactionId,
+                     activationResult: WhiskActivation,
+                     blockingInvoke: Boolean,
+                     controllerInstance: InstanceId) => {
+    implicit val transid: TransactionId = tid
 
     def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = 
false) = {
       val msg = CompletionMessage(transid, res, instance)
@@ -129,8 +129,8 @@ class InvokerReactive(config: WhiskConfig, instance: 
InstanceId, producer: Messa
   }
 
   /** Stores an activation in the database. */
-  val store = (tid: TransactionId, activation: WhiskActivation) => {
-    implicit val transid = tid
+  private val store = (tid: TransactionId, activation: WhiskActivation) => {
+    implicit val transid: TransactionId = tid
     logging.debug(this, "recording the activation result to the data store")
     WhiskActivation.put(activationStore, activation)(tid, notifier = 
None).andThen {
       case Success(id) => logging.debug(this, s"recorded activation")
@@ -139,18 +139,16 @@ class InvokerReactive(config: WhiskConfig, instance: 
InstanceId, producer: Messa
   }
 
   /** Creates a ContainerProxy Actor when being called. */
-  val childFactory = (f: ActorRefFactory) =>
+  private val childFactory = (f: ActorRefFactory) =>
     f.actorOf(ContainerProxy.props(containerFactory.createContainer, ack, 
store, logsProvider.collectLogs, instance))
 
-  val prewarmKind = "nodejs:6"
-  val prewarmExec = ExecManifest.runtimesManifest
+  private val prewarmKind = "nodejs:6"
+  private val prewarmExec = ExecManifest.runtimesManifest
     .resolveDefaultRuntime(prewarmKind)
-    .map { manifest =>
-      new CodeExecAsString(manifest, "", None)
-    }
+    .map(manifest => CodeExecAsString(manifest, "", None))
     .get
 
-  val pool = actorSystem.actorOf(
+  private val pool = actorSystem.actorOf(
     ContainerPool.props(
       childFactory,
       maximumContainers,
@@ -163,92 +161,99 @@ class InvokerReactive(config: WhiskConfig, instance: 
InstanceId, producer: Messa
     Future(ActivationMessage.parse(new String(bytes, StandardCharsets.UTF_8)))
       .flatMap(Future.fromTry)
       .flatMap { msg =>
+        // The message has been parsed correctly, thus the following code 
needs to *always* produce at least an
+        // active-ack.
+
+        implicit val transid: TransactionId = msg.transid
+
         if (!namespaceBlacklist.isBlacklisted(msg.user)) {
-          Future.successful(msg)
-        } else {
-          Future.failed(NamespaceBlacklistedException(msg.user.namespace.name))
-        }
-      }
-      .filter(_.action.version.isDefined)
-      .flatMap { msg =>
-        implicit val transid = msg.transid
+          val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION, 
logLevel = InfoLevel)
+          val namespace = msg.action.path
+          val name = msg.action.name
+          val actionid = FullyQualifiedEntityName(namespace, 
name).toDocId.asDocInfo(msg.revision)
+          val subject = msg.user.subject
 
-        val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION, 
logLevel = InfoLevel)
-        val namespace = msg.action.path
-        val name = msg.action.name
-        val actionid = FullyQualifiedEntityName(namespace, 
name).toDocId.asDocInfo(msg.revision)
-        val subject = msg.user.subject
+          logging.debug(this, s"${actionid.id} $subject ${msg.activationId}")
 
-        logging.debug(this, s"${actionid.id} $subject ${msg.activationId}")
+          // caching is enabled since actions have revision id and an updated
+          // action will not hit in the cache due to change in the revision id;
+          // if the doc revision is missing, then bypass cache
+          if (actionid.rev == DocRevision.empty) logging.warn(this, s"revision 
was not provided for ${actionid.id}")
 
-        // caching is enabled since actions have revision id and an updated
-        // action will not hit in the cache due to change in the revision id;
-        // if the doc revision is missing, then bypass cache
-        if (actionid.rev == DocRevision.empty) {
-          logging.warn(this, s"revision was not provided for ${actionid.id}")
-        }
+          WhiskAction
+            .get(entityStore, actionid.id, actionid.rev, fromCache = 
actionid.rev != DocRevision.empty)
+            .flatMap { action =>
+              action.toExecutableWhiskAction match {
+                case Some(executable) =>
+                  pool ! Run(executable, msg)
+                  Future.successful(())
+                case None =>
+                  logging.error(this, s"non-executable action reached the 
invoker ${action.fullyQualifiedName(false)}")
+                  Future.failed(new IllegalStateException("non-executable 
action reached the invoker"))
+              }
+            }
+            .recoverWith {
+              case t =>
+                // If the action cannot be found, the user has concurrently 
deleted it,
+                // making this an application error. All other errors are 
considered system
+                // errors and should cause the invoker to be considered 
unhealthy.
+                val response = t match {
+                  case _: NoDocumentException =>
+                    
ActivationResponse.applicationError(Messages.actionRemovedWhileInvoking)
+                  case _: DocumentTypeMismatchException | _: 
DocumentUnreadable =>
+                    
ActivationResponse.whiskError(Messages.actionMismatchWhileInvoking)
+                  case _ =>
+                    
ActivationResponse.whiskError(Messages.actionFetchErrorWhileInvoking)
+                }
 
-        WhiskAction
-          .get(entityStore, actionid.id, actionid.rev, fromCache = 
actionid.rev != DocRevision.empty)
-          .flatMap { action =>
-            action.toExecutableWhiskAction match {
-              case Some(executable) =>
-                pool ! Run(executable, msg)
+                val activation = generateFallbackActivation(msg, response)
+                activationFeed ! MessageFeed.Processed
+                ack(msg.transid, activation, msg.blocking, 
msg.rootControllerIndex)
+                store(msg.transid, activation)
                 Future.successful(())
-              case None =>
-                logging.error(this, s"non-executable action reached the 
invoker ${action.fullyQualifiedName(false)}")
-                Future.failed(new IllegalStateException("non-executable action 
reached the invoker"))
             }
-          }
-          .recoverWith {
-            case t =>
-              // If the action cannot be found, the user has concurrently 
deleted it,
-              // making this an application error. All other errors are 
considered system
-              // errors and should cause the invoker to be considered 
unhealthy.
-              val response = t match {
-                case _: NoDocumentException =>
-                  
ActivationResponse.applicationError(Messages.actionRemovedWhileInvoking)
-                case _: DocumentTypeMismatchException | _: DocumentUnreadable 
=>
-                  
ActivationResponse.whiskError(Messages.actionMismatchWhileInvoking)
-                case _ =>
-                  
ActivationResponse.whiskError(Messages.actionFetchErrorWhileInvoking)
-              }
-              val now = Instant.now
-              val causedBy = if (msg.causedBySequence) {
-                Some(Parameters(WhiskActivation.causedByAnnotation, 
JsString(Exec.SEQUENCE)))
-              } else None
-              val activation = WhiskActivation(
-                activationId = msg.activationId,
-                namespace = msg.user.namespace.toPath,
-                subject = msg.user.subject,
-                cause = msg.cause,
-                name = msg.action.name,
-                version = msg.action.version.getOrElse(SemVer()),
-                start = now,
-                end = now,
-                duration = Some(0),
-                response = response,
-                annotations = {
-                  Parameters(WhiskActivation.pathAnnotation, 
JsString(msg.action.asString)) ++ causedBy
-                })
-
-              activationFeed ! MessageFeed.Processed
-              ack(msg.transid, activation, msg.blocking, 
msg.rootControllerIndex)
-              store(msg.transid, activation)
-              Future.successful(())
-          }
+        } else {
+          // Iff the current namespace is blacklisted, an active-ack is only 
produced to keep the loadbalancer protocol
+          // Due to the protective nature of the blacklist, a database entry 
is not written.
+          activationFeed ! MessageFeed.Processed
+          val activation =
+            generateFallbackActivation(msg, 
ActivationResponse.applicationError(Messages.namespacesBlacklisted))
+          ack(msg.transid, activation, false, msg.rootControllerIndex)
+          logging.warn(this, s"namespace ${msg.user.namespace} was blocked in 
invoker.")
+          Future.successful(())
+        }
       }
       .recoverWith {
         case t =>
           // Iff everything above failed, we have a terminal error at hand. 
Either the message failed
           // to deserialize, or something threw an error where it is not 
expected to throw.
           activationFeed ! MessageFeed.Processed
-          t match {
-            case nse: NamespaceBlacklistedException => logging.warn(this, 
nse.getMessage)
-            case _                                  => logging.error(this, 
s"terminal failure while processing message: $t")
-          }
+          logging.error(this, s"terminal failure while processing message: $t")
           Future.successful(())
       }
   }
 
+  /** Generates an activation with zero runtime. Usually used for error cases 
*/
+  private def generateFallbackActivation(msg: ActivationMessage, response: 
ActivationResponse): WhiskActivation = {
+    val now = Instant.now
+    val causedBy = if (msg.causedBySequence) {
+      Some(Parameters(WhiskActivation.causedByAnnotation, 
JsString(Exec.SEQUENCE)))
+    } else None
+
+    WhiskActivation(
+      activationId = msg.activationId,
+      namespace = msg.user.namespace.toPath,
+      subject = msg.user.subject,
+      cause = msg.cause,
+      name = msg.action.name,
+      version = msg.action.version.getOrElse(SemVer()),
+      start = now,
+      end = now,
+      duration = Some(0),
+      response = response,
+      annotations = {
+        Parameters(WhiskActivation.pathAnnotation, 
JsString(msg.action.asString)) ++ causedBy
+      })
+  }
+
 }
diff --git 
a/core/invoker/src/main/scala/whisk/core/invoker/NamespaceBlacklist.scala 
b/core/invoker/src/main/scala/whisk/core/invoker/NamespaceBlacklist.scala
index 9909c82..4f4336e 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/NamespaceBlacklist.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/NamespaceBlacklist.scala
@@ -73,6 +73,3 @@ object NamespaceBlacklist {
 
 /** Configuration relevant to the namespace blacklist */
 case class NamespaceBlacklistConfig(pollInterval: FiniteDuration)
-
-/** Indicates the activation was stopped due to a blacklisted identity */
-case class NamespaceBlacklistedException(ns: String) extends 
Exception(s"Namespace $ns was blocked in invoker.")

-- 
To stop receiving notification emails like this one, please contact
cbic...@apache.org.

Reply via email to