markusthoemmes commented on a change in pull request #3202: Support action 
continuations in the controller
URL: 
https://github.com/apache/incubator-openwhisk/pull/3202#discussion_r167939781
 
 

 ##########
 File path: 
core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
 ##########
 @@ -140,6 +200,349 @@ protected[actions] trait PrimitiveActions {
     }
   }
 
+  /**
+   * Mutable cumulative accounting of what happened during the execution of a 
composition.
+   *
+   * Compositions are aborted if the number of action invocations exceeds a 
limit.
+   * The permitted max is n component invocations plus 2n+1 conductor 
invocations (where n is the actionSequenceLimit).
+   * The max is chosen to permit a sequence with up to n primitive actions.
+   *
+   * NOTE:
+   * A sequence invocation counts as one invocation irrespective of the number 
of action invocations in the sequence.
+   * If one component of a composition is also a composition, the caller and 
callee share the same accounting object.
+   * The counts are shared between callers and callees so the limit applies 
globally.
+   *
+   * @param components the current count of component actions already invoked
+   * @param conductors the current count of conductor actions already invoked
+   */
+  private case class CompositionAccounting(var components: Int = 0, var 
conductors: Int = 0)
+
+  /**
+   * A mutable session object to keep track of the execution of one 
composition.
+   *
+   * NOTE:
+   * The session object is not shared between callers and callees.
+   *
+   * @param activationId the activationId for the composition (ie the 
activation record for the composition)
+   * @param start the start time for the composition
+   * @param action the conductor action responsible for the execution of the 
composition
+   * @param cause the cause of the composition (activationId of the enclosing 
sequence or composition if any)
+   * @param duration the "user" time so far executing the composition (sum of 
durations for
+   *        all actions invoked so far which is different from the total time 
spent executing the composition)
+   * @param maxMemory the maximum memory annotation observed so far for the 
conductor action and components
+   * @param state the json state object to inject in the parameter object of 
the next conductor invocation
+   * @param accounting the global accounting object used to abort compositions 
requiring too many action invocations
+   * @param logs a mutable buffer that is appended with new activation ids as 
the composition unfolds
+   *             (in contrast with sequences, the logs of a hierarchy of 
compositions is not flattened)
+   */
+  private case class Session(activationId: ActivationId,
+                             start: Instant,
+                             action: ExecutableWhiskActionMetaData,
+                             cause: Option[ActivationId],
+                             var duration: Long,
+                             var maxMemory: Int,
+                             var state: Option[JsObject],
+                             accounting: CompositionAccounting,
+                             logs: Buffer[ActivationId])
+
+  /**
+   * A method that knows how to invoke a composition.
+   *
+   * The method instantiates the session object for the composition and 
invokes the conductor action.
+   * It waits for the activation response, synthesizes the activation record 
and writes it to the datastore.
+   * It distinguishes nested, blocking and non-blocking invokes, returning 
either the activation or the activation id.
+   *
+   * @param user the identity invoking the action
+   * @param action the conductor action to invoke for the composition
+   * @param payload the dynamic arguments for the activation
+   * @param waitForResponse if not empty, wait upto specified duration for a 
response (this is used for blocking activations)
+   * @param cause the activation id that is responsible for this 
invoke/activation
+   * @param accounting the accounting object for the caller if any
+   * @param transid a transaction id for logging
+   * @return a promise that completes with one of the following successful 
cases:
+   *            Right(WhiskActivation) if waiting for a response and response 
is ready within allowed duration,
+   *            Left(ActivationId) if not waiting for a response, or allowed 
duration has elapsed without a result ready
+   */
+  private def invokeComposition(user: Identity,
+                                action: ExecutableWhiskActionMetaData,
+                                payload: Option[JsObject],
+                                waitForResponse: Option[FiniteDuration],
+                                cause: Option[ActivationId],
+                                accounting: Option[CompositionAccounting] = 
None)(
+    implicit transid: TransactionId): Future[Either[ActivationId, 
WhiskActivation]] = {
+
+    val session = Session(
+      activationId = activationIdFactory.make(),
+      start = Instant.now(Clock.systemUTC()),
+      action,
+      cause,
+      duration = 0,
+      maxMemory = action.limits.memory.megabytes,
+      state = None,
+      accounting = accounting.getOrElse(CompositionAccounting()), // share 
accounting with caller
+      logs = Buffer.empty)
+
+    val response: Future[Either[ActivationId, WhiskActivation]] =
+      invokeConductor(user, payload, session).map(response => 
Right(completeActivation(user, session, response)))
+
+    // is caller waiting for the result of the activation?
+    cause
+      .map(_ => response) // ignore waitForResponse when not topmost
+      .orElse(
+        // blocking invoke, wait until timeout
+        waitForResponse.map(response.withAlternativeAfterTimeout(_, 
Future.successful(Left(session.activationId)))))
+      .getOrElse(
+        // no, return the session id
+        Future.successful(Left(session.activationId)))
+  }
+
+  /**
+   * A method that knows how to handle a conductor invocation.
+   *
+   * This method prepares the payload and invokes the conductor action.
+   * It parses the result and extracts the name of the next component action 
if any.
+   * It either invokes the desired component action or completes the 
composition invocation.
+   * It also checks the invocation counts against the limits.
+   *
+   * @param user the identity invoking the action
+   * @param payload the dynamic arguments for the activation
+   * @param session the session object for this composition
+   * @param transid a transaction id for logging
+   */
+  private def invokeConductor(user: Identity, payload: Option[JsObject], 
session: Session)(
+    implicit transid: TransactionId): Future[ActivationResponse] = {
+
+    if (session.accounting.conductors > 2 * actionSequenceLimit) {
+      // composition is too long
+      
Future.successful(ActivationResponse.applicationError(compositionIsTooLong))
+    } else {
+      // inject state into payload if any
+      val params = session.state
+        .map(state => Some(JsObject(payload.getOrElse(JsObject()).fields ++ 
state.fields)))
+        .getOrElse(payload)
+
+      // invoke conductor action
+      session.accounting.conductors += 1
+      val activationResponse =
+        invokeSimpleAction(
+          user,
+          action = session.action,
+          payload = params,
+          waitForResponse = Some(session.action.limits.timeout.duration + 
1.minute), // wait for result
+          cause = Some(session.activationId)) // cause is session id
+
+      waitForActivation(user, session, activationResponse).flatMap {
+        case Left(response) => // unsuccessful invocation, return error 
response
+          Future.successful(response)
+        case Right(activation) => // successful invocation
+          val result = activation.resultAsJson
+
+          // extract params from result, auto boxing result if not a dictionary
+          val params = result.fields.get(WhiskActivation.paramsField).map {
+            case obj: JsObject => obj
+            case value         => JsObject(WhiskActivation.valueField -> value)
+          }
+
+          // update session state, auto boxing state if not a dictionary
+          session.state = result.fields.get(WhiskActivation.stateField).map {
+            case obj: JsObject => obj
+            case value         => JsObject(WhiskActivation.stateField -> value)
+          }
+
+          // extract next action from result and invoke
+          result.fields.get(WhiskActivation.actionField) match {
+            case None =>
+              // no next action, end composition execution, return to caller
+              
Future.successful(ActivationResponse(activation.response.statusCode, 
Some(params.getOrElse(result))))
+            case Some(next) =>
+              Try(next.convertTo[EntityPath]) match {
+                case Failure(t) =>
+                  // parsing failure
+                  
Future.successful(ActivationResponse.applicationError(compositionComponentInvalid(next)))
+                case Success(_) if session.accounting.components >= 
actionSequenceLimit =>
+                  // composition is too long
+                  
Future.successful(ActivationResponse.applicationError(compositionIsTooLong))
+                case Success(next) =>
+                  // resolve and invoke next action
+                  val fqn = (if (next.defaultPackage) 
EntityPath.DEFAULT.addPath(next) else next)
+                    .resolveNamespace(user.namespace)
+                    .toFullyQualifiedEntityName
+                  val resource = Resource(fqn.path, 
Collection(Collection.ACTIONS), Some(fqn.name.asString))
+                  entitlementProvider
+                    .check(user, Privilege.ACTIVATE, Set(resource), noThrottle 
= true)
+                    .flatMap { _ =>
+                      // successful entitlement check
+                      WhiskActionMetaData
+                        .resolveActionAndMergeParameters(entityStore, fqn)
+                        .flatMap {
+                          case next =>
+                            // successful resolution
+                            invokeComponent(user, action = next, payload = 
params, session)
+                        }
+                        .recover {
+                          case _ =>
+                            // resolution failure
+                            
ActivationResponse.applicationError(compositionComponentNotFound(next.asString))
+                        }
+                    }
+                    .recover {
+                      case _ =>
+                        // failed entitlement check
+                        
ActivationResponse.applicationError(compositionComponentNotAccessible(next.asString))
+                    }
+              }
+          }
+      }
+    }
+
+  }
+
+  /**
+   * A method that knows how to handle a component invocation.
+   *
+   * This method distinguishes primitive actions, sequences, and compositions.
+   * The conductor action is reinvoked after the successful invocation of the 
component.
+   *
+   * @param user the identity invoking the action
+   * @param action the component action to invoke
+   * @param payload the dynamic arguments for the activation
+   * @param session the session object for this composition
+   * @param transid a transaction id for logging
+   */
+  private def invokeComponent(user: Identity, action: WhiskActionMetaData, 
payload: Option[JsObject], session: Session)(
+    implicit transid: TransactionId): Future[ActivationResponse] = {
+
+    val exec = action.toExecutableWhiskAction
+    val activationResponse: Future[Either[ActivationId, WhiskActivation]] = 
exec match {
+      case Some(action) if 
action.annotations.isTruthy(WhiskActivation.conductorAnnotation) => // 
composition
+        // invokeComposition will increase the invocation counts
+        invokeComposition(
+          user,
+          action,
+          payload,
+          waitForResponse = None, // not topmost, hence blocking, no need for 
timeout
+          cause = Some(session.activationId),
+          accounting = Some(session.accounting))
+      case Some(action) => // primitive action
+        session.accounting.components += 1
+        invokeSimpleAction(
+          user,
+          action,
+          payload,
+          waitForResponse = Some(action.limits.timeout.duration + 1.minute),
+          cause = Some(session.activationId))
+      case None => // sequence
+        session.accounting.components += 1
+        val SequenceExecMetaData(components) = action.exec
+        invokeSequence(
+          user,
+          action,
+          components,
+          payload,
+          waitForOutermostResponse = None,
+          cause = Some(session.activationId),
+          topmost = false,
+          atomicActionsCount = 0).map(r => r._1)
+    }
+
+    waitForActivation(user, session, activationResponse).flatMap {
+      case Left(response) => // unsuccessful invocation, return error response
+        Future.successful(response)
+      case Right(activation) => // reinvoke conductor on component result
+        invokeConductor(user, payload = Some(activation.resultAsJson), session 
= session)
+    }
+  }
+
+  /**
+   * Waits for a response from a conductor of component action invocation.
+   * Handles internal errors (activation failure or timeout).
+   * Logs the activation id and updates the duration and max memory for the 
session.
+   * Returns the activation record if successful, the error response if not.
+   *
+   * @param user the identity invoking the action
+   * @param session the session object for this composition
+   * @param activationResponse the future activation to wait on
+   * @param transid a transaction id for logging
+   */
+  private def waitForActivation(user: Identity,
+                                session: Session,
+                                activationResponse: 
Future[Either[ActivationId, WhiskActivation]])(
+    implicit transid: TransactionId): Future[Either[ActivationResponse, 
WhiskActivation]] = {
+
+    activationResponse
+      .map {
+        case Left(activationId) => // invocation timeout
+          session.logs += activationId
+          
Left(ActivationResponse.whiskError(compositionActivationTimeout(activationId)))
+        case Right(activation) => // successful invocation
+          session.logs += activation.activationId
+          // activation.duration should be defined but this is not reflected 
by the type so be defensive
+          session.duration += 
activation.duration.getOrElse(activation.end.toEpochMilli - 
activation.start.toEpochMilli)
+          activation.annotations.get("limits").foreach { limitsAnnotation =>
+            limitsAnnotation.asJsObject.getFields("memory") match {
+              case Seq(JsNumber(memory)) =>
+                session.maxMemory = Math.max(session.maxMemory, memory.toInt)
 
 Review comment:
   I believe this does not ignore the "not a sequence of number" cases but 
throws a `MatchException`.
   
   You're looking for:
   
   ```scala
   activation.annotations.get("limits")
     .flatMap(_.asJsObject.fields.get("memory"))
     .flatMap(memory => Try(memory.convertTo[Int]).toOption)
     .foreach(memory => session.maxMemory = memory max session.maxMemory
   ```
   
   To be relatively sure it doesn't throw (asJsObject can still throw)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to