rabbah commented on a change in pull request #3202: Support action continuations in the controller URL: https://github.com/apache/incubator-openwhisk/pull/3202#discussion_r164464673
########## File path: core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala ########## @@ -139,6 +199,356 @@ protected[actions] trait PrimitiveActions { } } + /** + * Mutable cumulative accounting of what happened during the execution of a composition. + * + * Compositions are aborted if the number of action invocations exceeds a limit. + * The permitted max is n component invocations plus 2n+1 conductor invocations (where n is the actionSequenceLimit). + * The max is chosen to permit a sequence with up to n primitive actions. + * + * NOTE: + * A sequence invocation counts as one invocation irrespective of the number of action invocations in the sequence. + * If one component of a composition is also a composition, the caller and callee share the same accounting object. + * The counts are shared between callers and callees so the limit applies globally. + * + * @param components the current count of component actions already invoked + * @param conductors the current count of conductor actions already invoked + */ + private case class CompositionAccounting(var components: Int = 0, var conductors: Int = 0) + + /** + * A mutable session object to keep track of the execution of one composition. + * + * NOTE: + * The session object is not shared between callers and callees. + * A callee has a reference to the session object for the caller. + * This permits the callee to return to the caller when done. + * + * @param activationId the activationId for the composition (ie the activation record for the composition) + * @param start the start time for the composition + * @param action the conductor action responsible for the execution of the composition + * @param cause the cause of the composition (activationId of the enclosing sequence or composition if any) + * @param duration the "user" time so far executing the composition (sum of durations for + * all actions invoked so far which is different from the total time spent executing the composition) + * @param maxMemory the maximum memory annotation observed so far for the conductor action and components + * @param state the json state object to inject in the parameter object of the next conductor invocation + * @param accounting the global accounting object used to abort compositions requiring too many action invocations + * @param logs a mutable buffer that is appended with new activation ids as the composition unfolds + * (in contrast with sequences, the logs of a hierarchy of compositions is not flattened) + * @param caller the session object for the parent composition (caller) if any + * @param result a placeholder for returning the result of the composition invocation (blocking invocation only) + */ + private case class Session(activationId: ActivationId, + start: Instant, + action: ExecutableWhiskActionMetaData, + cause: Option[ActivationId], + var duration: Long, + var maxMemory: Int, + var state: Option[JsObject], + accounting: CompositionAccounting, + logs: Buffer[ActivationId], + caller: Option[Session], + result: Option[Promise[Either[ActivationId, WhiskActivation]]]) + + /** + * A method that knows how to invoke a composition. + * + * The method instantiates the session object for the composition, invokes the conductor action for the composition, + * and waits for the composition result (resulting activation) if the invocation is blocking (up to timeout). + * + * @param user the identity invoking the action + * @param action the conductor action to invoke for the composition + * @param payload the dynamic arguments for the activation + * @param waitForResponse if not empty, wait upto specified duration for a response (this is used for blocking activations) + * @param cause the activation id that is responsible for this invoke/activation + * @param caller the session object for the caller if any + * @param transid a transaction id for logging + * @return a promise that completes with one of the following successful cases: + * Right(WhiskActivation) if waiting for a response and response is ready within allowed duration, + * Left(ActivationId) if not waiting for a response, or allowed duration has elapsed without a result ready + * or an error + */ + private def invokeComposition( + user: Identity, + action: ExecutableWhiskActionMetaData, + payload: Option[JsObject], + waitForResponse: Option[FiniteDuration], + cause: Option[ActivationId], + caller: Option[Session] = None)(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = { + + val session = Session( + activationId = activationIdFactory.make(), + start = Instant.now(Clock.systemUTC()), + action, + cause, + duration = 0, + maxMemory = action.limits.memory.megabytes, + state = None, + accounting = caller.map { _.accounting }.getOrElse(CompositionAccounting()), // share accounting with caller + logs = Buffer.empty, + caller, + result = waitForResponse.map { _ => + Promise[Either[ActivationId, WhiskActivation]]() // placeholder for result if blocking invoke + }) + + invokeConductor(user, payload, session) + + // is caller waiting for the result of the activation? + waitForResponse + .map { timeout => + // handle timeout + session.result.head.future + .withAlternativeAfterTimeout(timeout, Future.successful(Left(session.activationId))) + } + .getOrElse { + // no, return the session id + Future.successful(Left(session.activationId)) + } + } + + /** + * A method that knows how to handle a conductor invocation. + * + * This method prepares the payload and invokes the conductor action. + * It parses the result and extracts the name of the next component action if any. + * It either invokes the desired component action or completes the composition invocation. + * It also checks the invocation counts against the limits. + * + * @param user the identity invoking the action + * @param payload the dynamic arguments for the activation + * @param session the session object for this composition + * @param transid a transaction id for logging + */ + private def invokeConductor(user: Identity, payload: Option[JsObject], session: Session)( + implicit transid: TransactionId) = { + + if (session.accounting.conductors > 2 * actionSequenceLimit) { + // composition is too long + val response = ActivationResponse.applicationError(compositionIsTooLong) + completeAppActivation(user, session, response) + } else { + // inject state into payload if any + val params = session.state + .map { state => + Some(JsObject(payload.getOrElse(JsObject()).fields ++ state.fields)) + } + .getOrElse(payload) + + // invoke conductor action + session.accounting.conductors += 1 + val response = + invokeSimpleAction( + user, + session.action, + params, + Some(session.action.limits.timeout.duration + 1.minute), // wait for result + Some(session.activationId)) // cause is session id + + response.onComplete { + case Failure(t) => + // invocation failure + val response = ActivationResponse.whiskError(compositionActivationFailure) + completeAppActivation(user, session, response) + case Success(Left(activationId)) => + // invocation timeout + session.logs += activationId + val response = ActivationResponse.whiskError(compositionActivationTimeout(activationId)) + completeAppActivation(user, session, response) + case Success(Right(activation)) => + // successful invocation + session.logs += activation.activationId + session.duration += activation.duration.getOrElse(activation.end.toEpochMilli - activation.start.toEpochMilli) + + val result = activation.resultAsJson + + // extract params from result + val params = result.getFields(WhiskActivation.paramsField).headOption.map { p => + Try(p.asJsObject).getOrElse(JsObject(WhiskActivation.valueField -> p)) // ensure params is a dictionary + } + + // update session state + session.state = result.getFields(WhiskActivation.stateField).headOption.flatMap { p => + Try(Some(p.asJsObject)).getOrElse(None) + } + + // extract next action from result and invoke + result.getFields(WhiskActivation.actionField).headOption match { + case None => + // no next action, end composition execution, return to caller + val response = ActivationResponse(activation.response.statusCode, Some(params.getOrElse(result))) + completeAppActivation(user, session, response) + case Some(next) => + Try(next.convertTo[EntityPath]) match { + case Failure(t) => + // parsing failure + val response = ActivationResponse.applicationError(compositionComponentInvalid(next)) + completeAppActivation(user, session, response) + case Success(next) => + // resolve and invoke next action + val fqn = (if (next.defaultPackage) EntityPath.DEFAULT.addPath(next) else next) + .resolveNamespace(user.namespace) + .toFullyQualifiedEntityName + val resource = Resource(fqn.path, Collection(Collection.ACTIONS), Some(fqn.name.asString)) + entitlementProvider.check(user, Privilege.ACTIVATE, Set(resource), noThrottle = true).onComplete { + case Failure(t) => + // failed entitlement check + val response = + ActivationResponse.applicationError(compositionComponentNotAccessible(next.toString)) + completeAppActivation(user, session, response) + case Success(_) => + // successful entitlement check + WhiskActionMetaData.resolveActionAndMergeParameters(entityStore, fqn).onComplete { + case Failure(t) => + // resolution failure + val response = + ActivationResponse.applicationError(compositionComponentNotFound(next.toString)) + completeAppActivation(user, session, response) + case Success(next) => + if (session.accounting.components >= actionSequenceLimit) { + // composition is too long + val response = ActivationResponse.applicationError(compositionIsTooLong) + completeAppActivation(user, session, response) + } else { + // successful resolution + invokeComponent(user, next, params, session) + } + } + } + } + } + } + } + } + + /** + * A method that knows how to handle a component invocation. + * + * This method distinguishes primitive actions, sequences, and compositions. + * If the component action is a composition, it invokes invokeComposition and returns. + * invokeComposition takes care of resuming the execution of this composition, when the nested composition finishes. + * If the component action is not a composition, it is invoked followed by the reinvocation of the conductor action. + * This method also keeps track of the duration and memory footprint for the composition. + * + * @param user the identity invoking the action + * @param action the component action to invoke + * @param payload the dynamic arguments for the activation + * @param session the session object for this composition + * @param transid a transaction id for logging + */ + private def invokeComponent(user: Identity, action: WhiskActionMetaData, payload: Option[JsObject], session: Session)( + implicit transid: TransactionId) { + + val exec = action.toExecutableWhiskAction + if (action.annotations.get("conductor").isDefined && exec.isDefined) { + // composition + invokeComposition(user, exec.head, payload, None, Some(session.activationId), Some(session)) // non-blocking + // invokeComposition will take care of accounting and continuations + } else { + session.accounting.components += 1 + exec + .map { exec => + // not a sequence, not a composition + invokeSimpleAction( + user, + exec, + payload, + Some(action.limits.timeout.duration + 1.minute), + Some(session.activationId)) + } + .getOrElse { + // sequence + val SequenceExecMetaData(components) = action.exec + invokeSequence(user, action, components, payload, None, Some(session.activationId), false, 0).map(r => r._1) + } + .onComplete { + case Failure(t) => + val response = ActivationResponse.whiskError(compositionActivationFailure) + completeAppActivation(user, session, response) + case Success(Left(activationId)) => + session.logs += activationId + val response = ActivationResponse.whiskError(compositionActivationTimeout(activationId)) + completeAppActivation(user, session, response) + case Success(Right(activation)) => + session.logs += activation.activationId + session.duration += activation.duration.getOrElse( + activation.end.toEpochMilli - activation.start.toEpochMilli) + activation.annotations.get("limits") map { limitsAnnotation => + limitsAnnotation.asJsObject.getFields("memory") match { + case Seq(JsNumber(memory)) => + session.maxMemory = Math.max(session.maxMemory, memory.toInt) + } + } + + // invoke continuation + invokeConductor(user, Some(activation.resultAsJson), session) + } + } + } + + /** + * Creates an activation for a composition and writes it back to the datastore. + * Completes the associated promise if any. + * Resumes caller if any. + */ + private def completeAppActivation(user: Identity, session: Session, response: ActivationResponse)( + implicit transid: TransactionId): Unit = { + + // compute max memory + val sequenceLimits = Parameters( + WhiskActivation.limitsAnnotation, + ActionLimits(session.action.limits.timeout, MemoryLimit(session.maxMemory MB), session.action.limits.logs).toJson) + + // set causedBy if not topmost + val causedBy = if (session.cause.isDefined) { + Some(Parameters(WhiskActivation.causedByAnnotation, JsString(Exec.SEQUENCE))) + } else { + None + } + + val end = Instant.now(Clock.systemUTC()) + + // create the whisk activation + val activation = WhiskActivation( + namespace = user.namespace.toPath, + name = session.action.name, + user.subject, + activationId = session.activationId, + start = session.start, + end = end, + cause = session.cause, + response = response, + logs = ActivationLogs(session.logs.map(_.asString).toVector), + version = session.action.version, + publish = false, + annotations = Parameters(WhiskActivation.topmostAnnotation, JsBoolean(!session.cause.isDefined)) ++ + Parameters(WhiskActivation.pathAnnotation, JsString(session.action.fullyQualifiedName(false).asString)) ++ + Parameters(WhiskActivation.kindAnnotation, JsString(Exec.SEQUENCE)) ++ Review comment: using the same annotation as a sequence is nice here given there is a conductor annotation below which will permit a client to disambiguate a static sequence vs a dynamic trace. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services