This is an automated email from the ASF dual-hosted git repository. dubeejw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 8327cd0 Allow for activation store to accept user and request information (#3798) 8327cd0 is described below commit 8327cd06224a43b5cb91c8cbb13703d64f58026f Author: James Dubee <jwdu...@us.ibm.com> AuthorDate: Mon Aug 20 12:33:35 2018 -0400 Allow for activation store to accept user and request information (#3798) * Allow for activation store to accept user and request information * Make user and request non-optional parameters * Introduce UserContext to activation store * Update doc for new parameters --- .../logging/DockerToActivationLogStore.scala | 4 +- .../logging/ElasticSearchLogStore.scala | 7 +- .../containerpool/logging/LogDriverLogStore.scala | 5 +- .../core/containerpool/logging/LogStore.scala | 4 +- .../containerpool/logging/SplunkLogStore.scala | 4 +- .../whisk/core/database/ActivationStore.scala | 56 +++++++++----- .../core/database/ArtifactActivationStore.scala | 48 +++++++----- .../scala/whisk/core/controller/Activations.scala | 59 +++++++------- .../scala/whisk/core/controller/Triggers.scala | 89 ++++++++++++---------- .../core/controller/actions/PrimitiveActions.scala | 16 ++-- .../core/controller/actions/SequenceActions.scala | 6 +- .../whisk/core/containerpool/ContainerProxy.scala | 12 ++- .../scala/whisk/core/invoker/InvokerReactive.scala | 8 +- .../logging/ElasticSearchLogStoreTests.scala | 15 ++-- .../logging/SplunkLogStoreTests.scala | 9 ++- .../containerpool/test/ContainerProxyTests.scala | 3 +- .../core/controller/test/ActionsApiTests.scala | 10 ++- .../core/controller/test/ActivationsApiTests.scala | 50 ++++++------ .../controller/test/ControllerTestCommon.scala | 33 ++++---- .../core/controller/test/TriggersApiTests.scala | 9 ++- 20 files changed, 253 insertions(+), 194 deletions(-) diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala index 520e51e..f0ab49f 100644 --- a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala +++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala @@ -19,7 +19,6 @@ package whisk.core.containerpool.logging import akka.NotUsed import akka.actor.ActorSystem -import akka.http.scaladsl.model.HttpRequest import akka.stream.ActorMaterializer import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Flow @@ -29,6 +28,7 @@ import whisk.common.TransactionId import whisk.core.containerpool.Container import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation} import whisk.http.Messages +import whisk.core.database.UserContext import scala.concurrent.{ExecutionContext, Future} @@ -67,7 +67,7 @@ class DockerToActivationLogStore(system: ActorSystem) extends LogStore { override val containerParameters = Map("--log-driver" -> Set("json-file")) /* As logs are already part of the activation record, just return that bit of it */ - override def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs] = + override def fetchLogs(activation: WhiskActivation, context: UserContext): Future[ActivationLogs] = Future.successful(activation.logs) override def collectLogs(transid: TransactionId, diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchLogStore.scala index b404a24..58da7fa 100644 --- a/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchLogStore.scala +++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchLogStore.scala @@ -26,6 +26,7 @@ import akka.http.scaladsl.model._ import whisk.core.entity.{ActivationLogs, Identity, WhiskActivation} import whisk.core.containerpool.logging.ElasticSearchJsonProtocol._ import whisk.core.ConfigKeys +import whisk.core.database.UserContext import scala.concurrent.{Future, Promise} import scala.util.Try @@ -99,12 +100,12 @@ class ElasticSearchLogStore( private def generatePath(user: Identity) = elasticSearchConfig.path.format(user.namespace.uuid.asString) - override def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs] = { - val headers = extractRequiredHeaders(request.headers) + override def fetchLogs(activation: WhiskActivation, context: UserContext): Future[ActivationLogs] = { + val headers = extractRequiredHeaders(context.request.headers) // Return logs from ElasticSearch, or return logs from activation if required headers are not present if (headers.length == elasticSearchConfig.requiredHeaders.length) { - esClient.search[EsSearchResult](generatePath(user), generatePayload(activation), headers).flatMap { + esClient.search[EsSearchResult](generatePath(context.user), generatePayload(activation), headers).flatMap { case Right(queryResult) => Future.successful(transcribeLogs(queryResult)) case Left(code) => diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala index efbbb80..daa3acb 100644 --- a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala +++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala @@ -18,12 +18,11 @@ package whisk.core.containerpool.logging import akka.actor.ActorSystem -import akka.http.scaladsl.model.HttpRequest -import whisk.core.entity.Identity import whisk.common.TransactionId import whisk.core.containerpool.Container import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation} +import whisk.core.database.UserContext import scala.concurrent.Future @@ -49,7 +48,7 @@ class LogDriverLogStore(actorSystem: ActorSystem) extends LogStore { /** no logs exposed to API/CLI using only the LogDriverLogStore; use an extended version, * e.g. the SplunkLogStore to expose logs from some external source */ - def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs] = + def fetchLogs(activation: WhiskActivation, context: UserContext): Future[ActivationLogs] = Future.successful(ActivationLogs(Vector("Logs are not available."))) } diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala index 2bc3f98..29433d9 100644 --- a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala +++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala @@ -18,12 +18,12 @@ package whisk.core.containerpool.logging import akka.actor.ActorSystem -import akka.http.scaladsl.model.HttpRequest import whisk.common.TransactionId import whisk.core.containerpool.Container import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation} import whisk.spi.Spi +import whisk.core.database.UserContext import scala.concurrent.Future @@ -76,7 +76,7 @@ trait LogStore { * @param activation activation to fetch the logs for * @return the relevant logs */ - def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs] + def fetchLogs(activation: WhiskActivation, context: UserContext): Future[ActivationLogs] } trait LogStoreProvider extends Spi { diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala index 5baef46..1cd93b5 100644 --- a/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala +++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala @@ -53,7 +53,7 @@ import whisk.common.AkkaLogging import whisk.core.ConfigKeys import whisk.core.entity.ActivationLogs import whisk.core.entity.WhiskActivation -import whisk.core.entity.Identity +import whisk.core.database.UserContext case class SplunkLogStoreConfig(host: String, port: Int, @@ -98,7 +98,7 @@ class SplunkLogStore( Http().createClientHttpsContext(AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose.withDisableSNI(true)))) else Http().defaultClientHttpsContext) - override def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs] = { + override def fetchLogs(activation: WhiskActivation, context: UserContext): Future[ActivationLogs] = { //example curl request: // curl -u username:password -k https://splunkhost:port/services/search/jobs -d exec_mode=oneshot -d output_mode=json -d "search=search index=\"someindex\" | spath=activation_id | search activation_id=a930e5ae4ad4455c8f2505d665aad282 | table log_message" -d "earliest_time=2017-08-29T12:00:00" -d "latest_time=2017-10-29T12:00:00" diff --git a/common/scala/src/main/scala/whisk/core/database/ActivationStore.scala b/common/scala/src/main/scala/whisk/core/database/ActivationStore.scala index 106f57f..9a821c9 100644 --- a/common/scala/src/main/scala/whisk/core/database/ActivationStore.scala +++ b/common/scala/src/main/scala/whisk/core/database/ActivationStore.scala @@ -21,6 +21,7 @@ import java.time.Instant import akka.actor.ActorSystem import akka.stream.ActorMaterializer +import akka.http.scaladsl.model.HttpRequest import spray.json.JsObject import whisk.common.{Logging, TransactionId} import whisk.core.entity._ @@ -28,38 +29,45 @@ import whisk.spi.Spi import scala.concurrent.Future +case class UserContext(user: Identity, request: HttpRequest = HttpRequest()) + trait ActivationStore { /** * Stores an activation. * * @param activation activation to store + * @param context user and request context * @param transid transaction ID for request * @param notifier cache change notifier * @return Future containing DocInfo related to stored activation */ - def store(activation: WhiskActivation)(implicit transid: TransactionId, - notifier: Option[CacheChangeNotification]): Future[DocInfo] + def store(activation: WhiskActivation, context: UserContext)( + implicit transid: TransactionId, + notifier: Option[CacheChangeNotification]): Future[DocInfo] /** * Retrieves an activation corresponding to the specified activation ID. * * @param activationId ID of activation to retrieve + * @param context user and request context * @param transid transaction ID for request * @return Future containing the retrieved WhiskActivation */ - def get(activationId: ActivationId)(implicit transid: TransactionId): Future[WhiskActivation] + def get(activationId: ActivationId, context: UserContext)(implicit transid: TransactionId): Future[WhiskActivation] /** * Deletes an activation corresponding to the provided activation ID. * * @param activationId ID of activation to delete + * @param context user and request context * @param transid transaction ID for the request * @param notifier cache change notifier * @return Future containing a Boolean value indication whether the activation was deleted */ - def delete(activationId: ActivationId)(implicit transid: TransactionId, - notifier: Option[CacheChangeNotification]): Future[Boolean] + def delete(activationId: ActivationId, context: UserContext)( + implicit transid: TransactionId, + notifier: Option[CacheChangeNotification]): Future[Boolean] /** * Counts the number of activations in a namespace. @@ -69,6 +77,7 @@ trait ActivationStore { * @param skip number of activations to skip * @param since timestamp to retrieve activations after * @param upto timestamp to retrieve activations before + * @param context user and request context * @param transid transaction ID for request * @return Future containing number of activations returned from query in JSON format */ @@ -76,7 +85,8 @@ trait ActivationStore { name: Option[EntityPath] = None, skip: Int, since: Option[Instant] = None, - upto: Option[Instant] = None)(implicit transid: TransactionId): Future[JsObject] + upto: Option[Instant] = None, + context: UserContext)(implicit transid: TransactionId): Future[JsObject] /** * Returns activations corresponding to provided entity name. @@ -88,18 +98,20 @@ trait ActivationStore { * @param includeDocs return document with each activation * @param since timestamp to retrieve activations after * @param upto timestamp to retrieve activations before + * @param context user and request context * @param transid transaction ID for request * @return When docs are not included, a Future containing a List of activations in JSON format is returned. When docs * are included, a List of WhiskActivation is returned. */ - def listActivationsMatchingName(namespace: EntityPath, - name: EntityPath, - skip: Int, - limit: Int, - includeDocs: Boolean = false, - since: Option[Instant] = None, - upto: Option[Instant] = None)( - implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]] + def listActivationsMatchingName( + namespace: EntityPath, + name: EntityPath, + skip: Int, + limit: Int, + includeDocs: Boolean = false, + since: Option[Instant] = None, + upto: Option[Instant] = None, + context: UserContext)(implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]] /** * List all activations in a specified namespace. @@ -110,17 +122,19 @@ trait ActivationStore { * @param includeDocs return document with each activation * @param since timestamp to retrieve activations after * @param upto timestamp to retrieve activations before + * @param context user and request context * @param transid transaction ID for request * @return When docs are not included, a Future containing a List of activations in JSON format is returned. When docs * are included, a List of WhiskActivation is returned. */ - def listActivationsInNamespace(namespace: EntityPath, - skip: Int, - limit: Int, - includeDocs: Boolean = false, - since: Option[Instant] = None, - upto: Option[Instant] = None)( - implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]] + def listActivationsInNamespace( + namespace: EntityPath, + skip: Int, + limit: Int, + includeDocs: Boolean = false, + since: Option[Instant] = None, + upto: Option[Instant] = None, + context: UserContext)(implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]] } trait ActivationStoreProvider extends Spi { diff --git a/common/scala/src/main/scala/whisk/core/database/ArtifactActivationStore.scala b/common/scala/src/main/scala/whisk/core/database/ArtifactActivationStore.scala index 15da34b..d6f5ad5 100644 --- a/common/scala/src/main/scala/whisk/core/database/ArtifactActivationStore.scala +++ b/common/scala/src/main/scala/whisk/core/database/ArtifactActivationStore.scala @@ -36,8 +36,9 @@ class ArtifactActivationStore(actorSystem: ActorSystem, actorMaterializer: Actor private val artifactStore: ArtifactStore[WhiskActivation] = WhiskActivationStore.datastore()(actorSystem, logging, actorMaterializer) - def store(activation: WhiskActivation)(implicit transid: TransactionId, - notifier: Option[CacheChangeNotification]): Future[DocInfo] = { + def store(activation: WhiskActivation, context: UserContext)( + implicit transid: TransactionId, + notifier: Option[CacheChangeNotification]): Future[DocInfo] = { logging.debug(this, s"recording activation '${activation.activationId}'") @@ -54,7 +55,8 @@ class ArtifactActivationStore(actorSystem: ActorSystem, actorMaterializer: Actor res } - def get(activationId: ActivationId)(implicit transid: TransactionId): Future[WhiskActivation] = { + def get(activationId: ActivationId, context: UserContext)( + implicit transid: TransactionId): Future[WhiskActivation] = { WhiskActivation.get(artifactStore, DocId(activationId.asString)) } @@ -62,8 +64,9 @@ class ArtifactActivationStore(actorSystem: ActorSystem, actorMaterializer: Actor * Here there is added overhead of retrieving the specified activation before deleting it, so this method should not * be used in production or performance related code. */ - def delete(activationId: ActivationId)(implicit transid: TransactionId, - notifier: Option[CacheChangeNotification]): Future[Boolean] = { + def delete(activationId: ActivationId, context: UserContext)( + implicit transid: TransactionId, + notifier: Option[CacheChangeNotification]): Future[Boolean] = { WhiskActivation.get(artifactStore, DocId(activationId.asString)) flatMap { doc => WhiskActivation.del(artifactStore, doc.docinfo) } @@ -73,7 +76,8 @@ class ArtifactActivationStore(actorSystem: ActorSystem, actorMaterializer: Actor name: Option[EntityPath] = None, skip: Int, since: Option[Instant] = None, - upto: Option[Instant] = None)(implicit transid: TransactionId): Future[JsObject] = { + upto: Option[Instant] = None, + context: UserContext)(implicit transid: TransactionId): Future[JsObject] = { WhiskActivation.countCollectionInNamespace( artifactStore, name.map(p => namespace.addPath(p)).getOrElse(namespace), @@ -84,14 +88,15 @@ class ArtifactActivationStore(actorSystem: ActorSystem, actorMaterializer: Actor name.map(_ => WhiskActivation.filtersView).getOrElse(WhiskActivation.view)) } - def listActivationsMatchingName(namespace: EntityPath, - name: EntityPath, - skip: Int, - limit: Int, - includeDocs: Boolean = false, - since: Option[Instant] = None, - upto: Option[Instant] = None)( - implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]] = { + def listActivationsMatchingName( + namespace: EntityPath, + name: EntityPath, + skip: Int, + limit: Int, + includeDocs: Boolean = false, + since: Option[Instant] = None, + upto: Option[Instant] = None, + context: UserContext)(implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]] = { WhiskActivation.listActivationsMatchingName( artifactStore, namespace, @@ -104,13 +109,14 @@ class ArtifactActivationStore(actorSystem: ActorSystem, actorMaterializer: Actor StaleParameter.UpdateAfter) } - def listActivationsInNamespace(namespace: EntityPath, - skip: Int, - limit: Int, - includeDocs: Boolean = false, - since: Option[Instant] = None, - upto: Option[Instant] = None)( - implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]] = { + def listActivationsInNamespace( + namespace: EntityPath, + skip: Int, + limit: Int, + includeDocs: Boolean = false, + since: Option[Instant] = None, + upto: Option[Instant] = None, + context: UserContext)(implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]] = { WhiskActivation.listCollectionInNamespace( artifactStore, namespace, diff --git a/core/controller/src/main/scala/whisk/core/controller/Activations.scala b/core/controller/src/main/scala/whisk/core/controller/Activations.scala index fb4ab54..82ec062 100644 --- a/core/controller/src/main/scala/whisk/core/controller/Activations.scala +++ b/core/controller/src/main/scala/whisk/core/controller/Activations.scala @@ -36,6 +36,7 @@ import whisk.core.entitlement.{Collection, Privilege, Resource} import whisk.core.entity._ import whisk.http.ErrorResponse.terminate import whisk.http.Messages +import whisk.core.database.UserContext object WhiskActivationsApi { @@ -119,18 +120,21 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit /** Dispatches resource to the proper handler depending on context. */ protected override def dispatchOp(user: Identity, op: Privilege, resource: Resource)( implicit transid: TransactionId) = { - - resource.entity.flatMap(e => ActivationId.parse(e).toOption) match { - case Some(aid) => - op match { - case READ => fetch(user, resource.namespace, aid) - case _ => reject // should not get here - } - case None => - op match { - case READ => list(resource.namespace) - case _ => reject // should not get here - } + extractRequest { request => + val context = UserContext(user, request) + + resource.entity.flatMap(e => ActivationId.parse(e).toOption) match { + case Some(aid) => + op match { + case READ => fetch(context, resource.namespace, aid) + case _ => reject // should not get here + } + case None => + op match { + case READ => list(context, resource.namespace) + case _ => reject // should not get here + } + } } } @@ -141,7 +145,7 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit * - 200 [] or [WhiskActivation as JSON] * - 500 Internal Server Error */ - private def list(namespace: EntityPath)(implicit transid: TransactionId) = { + private def list(context: UserContext, namespace: EntityPath)(implicit transid: TransactionId) = { import WhiskActivationsApi.stringToRestrictedEntityPath import WhiskActivationsApi.stringToInstantDeserializer import WhiskActivationsApi.stringToListLimit @@ -157,16 +161,16 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit 'upto.as[Instant] ?) { (skip, limit, count, docs, name, since, upto) => if (count && !docs) { countEntities { - activationStore.countActivationsInNamespace(namespace, name.flatten, skip.n, since, upto) + activationStore.countActivationsInNamespace(namespace, name.flatten, skip.n, since, upto, context) } } else if (count && docs) { terminate(BadRequest, Messages.docsNotAllowedWithCount) } else { val activations = name.flatten match { case Some(action) => - activationStore.listActivationsMatchingName(namespace, action, skip.n, limit.n, docs, since, upto) + activationStore.listActivationsMatchingName(namespace, action, skip.n, limit.n, docs, since, upto, context) case None => - activationStore.listActivationsInNamespace(namespace, skip.n, limit.n, docs, since, upto) + activationStore.listActivationsInNamespace(namespace, skip.n, limit.n, docs, since, upto, context) } listEntities(activations map (_.fold((js) => js, (wa) => wa.map(_.toExtendedJson)))) } @@ -181,16 +185,15 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit * - 404 Not Found * - 500 Internal Server Error */ - private def fetch(user: Identity, namespace: EntityPath, activationId: ActivationId)( + private def fetch(context: UserContext, namespace: EntityPath, activationId: ActivationId)( implicit transid: TransactionId) = { val docid = DocId(WhiskEntity.qualifiedName(namespace, activationId)) pathEndOrSingleSlash { getEntity( - activationStore.get(ActivationId(docid.asString)), + activationStore.get(ActivationId(docid.asString), context), postProcess = Some((activation: WhiskActivation) => complete(activation.toExtendedJson))) - - } ~ (pathPrefix(resultPath) & pathEnd) { fetchResponse(docid) } ~ - (pathPrefix(logsPath) & pathEnd) { fetchLogs(user, docid) } + } ~ (pathPrefix(resultPath) & pathEnd) { fetchResponse(context, docid) } ~ + (pathPrefix(logsPath) & pathEnd) { fetchLogs(context, docid) } } /** @@ -201,9 +204,9 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit * - 404 Not Found * - 500 Internal Server Error */ - private def fetchResponse(docid: DocId)(implicit transid: TransactionId) = { + private def fetchResponse(context: UserContext, docid: DocId)(implicit transid: TransactionId) = { getEntityAndProject( - activationStore.get(ActivationId(docid.asString)), + activationStore.get(ActivationId(docid.asString), context), (activation: WhiskActivation) => Future.successful(activation.response.toExtendedJson)) } @@ -215,11 +218,9 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit * - 404 Not Found * - 500 Internal Server Error */ - private def fetchLogs(user: Identity, docid: DocId)(implicit transid: TransactionId) = { - extractRequest { request => - getEntityAndProject( - activationStore.get(ActivationId(docid.asString)), - (activation: WhiskActivation) => logStore.fetchLogs(user, activation, request).map(_.toJsonObject)) - } + private def fetchLogs(context: UserContext, docid: DocId)(implicit transid: TransactionId) = { + getEntityAndProject( + activationStore.get(ActivationId(docid.asString), context), + (activation: WhiskActivation) => logStore.fetchLogs(activation, context).map(_.toJsonObject)) } } diff --git a/core/controller/src/main/scala/whisk/core/controller/Triggers.scala b/core/controller/src/main/scala/whisk/core/controller/Triggers.scala index d293aba..a4d2ecf 100644 --- a/core/controller/src/main/scala/whisk/core/controller/Triggers.scala +++ b/core/controller/src/main/scala/whisk/core/controller/Triggers.scala @@ -45,6 +45,7 @@ import whisk.core.entity._ import whisk.core.entity.types.EntityStore import whisk.http.ErrorResponse import whisk.http.Messages +import whisk.core.database.UserContext /** A trait implementing the triggers API. */ trait WhiskTriggersApi extends WhiskCollectionAPI { @@ -132,48 +133,52 @@ trait WhiskTriggersApi extends WhiskCollectionAPI { */ override def activate(user: Identity, entityName: FullyQualifiedEntityName, env: Option[Parameters])( implicit transid: TransactionId) = { - entity(as[Option[JsObject]]) { payload => - getEntity(WhiskTrigger.get(entityStore, entityName.toDocId), Some { - trigger: WhiskTrigger => - val triggerActivationId = activationIdFactory.make() - logging.info(this, s"[POST] trigger activation id: ${triggerActivationId}") - val triggerActivation = WhiskActivation( - namespace = user.namespace.name.toPath, // all activations should end up in the one space regardless trigger.namespace, - entityName.name, - user.subject, - triggerActivationId, - Instant.now(Clock.systemUTC()), - Instant.EPOCH, - response = ActivationResponse.success(payload orElse Some(JsObject.empty)), - version = trigger.version, - duration = None) - - // List of active rules associated with the trigger - val activeRules: Map[FullyQualifiedEntityName, ReducedRule] = - trigger.rules.map(_.filter(_._2.status == Status.ACTIVE)).getOrElse(Map.empty) - - if (activeRules.nonEmpty) { - val args: JsObject = trigger.parameters.merge(payload).getOrElse(JsObject.empty) - - activateRules(user, args, trigger.rules.getOrElse(Map.empty)) - .map(results => triggerActivation.withLogs(ActivationLogs(results.map(_.toJson.compactPrint).toVector))) - .recover { - case e => - logging.error(this, s"Failed to write action activation results to trigger activation: $e") - triggerActivation - } - .map { activation => - activationStore.store(activation) - } - complete(Accepted, triggerActivationId.toJsObject) - } else { - logging - .debug( - this, - s"[POST] trigger without an active rule was activated; no trigger activation record created for $entityName") - complete(NoContent) - } - }) + extractRequest { request => + val context = UserContext(user, request) + + entity(as[Option[JsObject]]) { payload => + getEntity(WhiskTrigger.get(entityStore, entityName.toDocId), Some { + trigger: WhiskTrigger => + val triggerActivationId = activationIdFactory.make() + logging.info(this, s"[POST] trigger activation id: ${triggerActivationId}") + val triggerActivation = WhiskActivation( + namespace = user.namespace.name.toPath, // all activations should end up in the one space regardless trigger.namespace, + entityName.name, + user.subject, + triggerActivationId, + Instant.now(Clock.systemUTC()), + Instant.EPOCH, + response = ActivationResponse.success(payload orElse Some(JsObject.empty)), + version = trigger.version, + duration = None) + + // List of active rules associated with the trigger + val activeRules: Map[FullyQualifiedEntityName, ReducedRule] = + trigger.rules.map(_.filter(_._2.status == Status.ACTIVE)).getOrElse(Map.empty) + + if (activeRules.nonEmpty) { + val args: JsObject = trigger.parameters.merge(payload).getOrElse(JsObject.empty) + + activateRules(user, args, trigger.rules.getOrElse(Map.empty)) + .map(results => triggerActivation.withLogs(ActivationLogs(results.map(_.toJson.compactPrint).toVector))) + .recover { + case e => + logging.error(this, s"Failed to write action activation results to trigger activation: $e") + triggerActivation + } + .map { activation => + activationStore.store(activation, context) + } + complete(Accepted, triggerActivationId.toJsObject) + } else { + logging + .debug( + this, + s"[POST] trigger without an active rule was activated; no trigger activation record created for $entityName") + complete(NoContent) + } + }) + } } } diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala index c2ab52d..4a0b0ec 100644 --- a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala +++ b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala @@ -34,6 +34,7 @@ import whisk.core.entity.size.SizeInt import whisk.core.entity.types.EntityStore import whisk.http.Messages._ import whisk.utils.ExecutionContextFactory.FutureExtensions +import whisk.core.database.UserContext import scala.collection.mutable.Buffer import scala.concurrent.{ExecutionContext, Future, Promise} @@ -517,6 +518,8 @@ protected[actions] trait PrimitiveActions { private def completeActivation(user: Identity, session: Session, response: ActivationResponse)( implicit transid: TransactionId): WhiskActivation = { + val context = UserContext(user) + // compute max memory val sequenceLimits = Parameters( WhiskActivation.limitsAnnotation, @@ -550,7 +553,7 @@ protected[actions] trait PrimitiveActions { sequenceLimits, duration = Some(session.duration)) - activationStore.store(activation)(transid, notifier = None) + activationStore.store(activation, context)(transid, notifier = None) activation } @@ -569,8 +572,8 @@ protected[actions] trait PrimitiveActions { totalWaitTime: FiniteDuration, activeAckResponse: Future[Either[ActivationId, WhiskActivation]])( implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = { + val context = UserContext(user) val result = Promise[Either[ActivationId, WhiskActivation]] - val docid = new DocId(WhiskEntity.qualifiedName(user.namespace.name.toPath, activationId)) logging.debug(this, s"action activation will block for result upto $totalWaitTime") @@ -578,11 +581,11 @@ protected[actions] trait PrimitiveActions { // in case of an incomplete active-ack (record too large for example). activeAckResponse.foreach { case Right(activation) => result.trySuccess(Right(activation)) - case _ => pollActivation(docid, result, i => 1.seconds + (2.seconds * i), maxRetries = 4) + case _ => pollActivation(docid, context, result, i => 1.seconds + (2.seconds * i), maxRetries = 4) } // 2. Poll the database slowly in case the active-ack never arrives - pollActivation(docid, result, _ => 15.seconds) + pollActivation(docid, context, result, _ => 15.seconds) // 3. Timeout forces a fallback to activationId val timeout = actorSystem.scheduler.scheduleOnce(totalWaitTime)(result.trySuccess(Left(activationId))) @@ -602,15 +605,16 @@ protected[actions] trait PrimitiveActions { * @param result promise to resolve on result. Is also used to abort polling once completed. */ private def pollActivation(docid: DocId, + context: UserContext, result: Promise[Either[ActivationId, WhiskActivation]], wait: Int => FiniteDuration, retries: Int = 0, maxRetries: Int = Int.MaxValue)(implicit transid: TransactionId): Unit = { if (!result.isCompleted && retries < maxRetries) { val schedule = actorSystem.scheduler.scheduleOnce(wait(retries)) { - activationStore.get(ActivationId(docid.asString)).onComplete { + activationStore.get(ActivationId(docid.asString), context).onComplete { case Success(activation) => result.trySuccess(Right(activation)) - case Failure(_: NoDocumentException) => pollActivation(docid, result, wait, retries + 1, maxRetries) + case Failure(_: NoDocumentException) => pollActivation(docid, context, result, wait, retries + 1, maxRetries) case Failure(t: Throwable) => result.tryFailure(t) } } diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala index cd6c1ea..d38a91a 100644 --- a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala +++ b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala @@ -40,6 +40,7 @@ import whisk.core.entity.size.SizeInt import whisk.core.entity.types._ import whisk.http.Messages._ import whisk.utils.ExecutionContextFactory.FutureExtensions +import whisk.core.database.UserContext protected[actions] trait SequenceActions { /** The core collections require backend services to be injected in this trait. */ @@ -150,6 +151,8 @@ protected[actions] trait SequenceActions { start: Instant, cause: Option[ActivationId])( implicit transid: TransactionId): Future[(Right[ActivationId, WhiskActivation], Int)] = { + val context = UserContext(user) + // not topmost, no need to worry about terminating incoming request // Note: the future for the sequence result recovers from all throwable failures futureSeqResult @@ -161,7 +164,8 @@ protected[actions] trait SequenceActions { (Right(seqActivation), accounting.atomicActionCnt) } .andThen { - case Success((Right(seqActivation), _)) => activationStore.store(seqActivation)(transid, notifier = None) + case Success((Right(seqActivation), _)) => + activationStore.store(seqActivation, context)(transid, notifier = None) // This should never happen; in this case, there is no activation record created or stored: // should there be? diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala index d04dfb3..92eda45 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala @@ -40,6 +40,7 @@ import whisk.http.Messages import akka.event.Logging.InfoLevel import pureconfig.loadConfigOrThrow import whisk.core.ConfigKeys +import whisk.core.database.UserContext // States sealed trait ContainerState @@ -96,7 +97,7 @@ case object RescheduleJob // job is sent back to parent and could not be process class ContainerProxy( factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container], sendActiveAck: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID) => Future[Any], - storeActivation: (TransactionId, WhiskActivation) => Future[Any], + storeActivation: (TransactionId, WhiskActivation, UserContext) => Future[Any], collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs], instance: InvokerInstanceId, poolConfig: ContainerPoolConfig, @@ -156,6 +157,7 @@ class ContainerProxy( case BlackboxStartupError(msg) => ActivationResponse.applicationError(msg) case _ => ActivationResponse.whiskError(Messages.resourceProvisionError) } + val context = UserContext(job.msg.user) // construct an appropriate activation and record it in the datastore, // also update the feed and active ack; the container cleanup is queued // implicitly via a FailureMessage which will be processed later when the state @@ -167,7 +169,7 @@ class ContainerProxy( job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid) - storeActivation(transid, activation) + storeActivation(transid, activation, context) } .flatMap { container => // now attempt to inject the user code and run the action @@ -391,6 +393,8 @@ class ContainerProxy( activation.foreach( sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid)) + val context = UserContext(job.msg.user) + // Adds logs to the raw activation. val activationWithLogs: Future[Either[ActivationLogReadingError, WhiskActivation]] = activation .flatMap { activation => @@ -415,7 +419,7 @@ class ContainerProxy( } // Storing the record. Entirely asynchronous and not waited upon. - activationWithLogs.map(_.fold(_.activation, identity)).foreach(storeActivation(tid, _)) + activationWithLogs.map(_.fold(_.activation, identity)).foreach(storeActivation(tid, _, context)) // Disambiguate activation errors and transform the Either into a failed/successful Future respectively. activationWithLogs.flatMap { @@ -432,7 +436,7 @@ object ContainerProxy { def props( factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container], ack: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID) => Future[Any], - store: (TransactionId, WhiskActivation) => Future[Any], + store: (TransactionId, WhiskActivation, UserContext) => Future[Any], collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs], instance: InvokerInstanceId, poolConfig: ContainerPoolConfig, diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala index de9b4bc..634e416 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala @@ -36,6 +36,7 @@ import whisk.core.database._ import whisk.core.entity._ import whisk.http.Messages import whisk.spi.SpiLoader +import whisk.core.database.UserContext import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ @@ -160,9 +161,9 @@ class InvokerReactive( } /** Stores an activation in the database. */ - private val store = (tid: TransactionId, activation: WhiskActivation) => { + private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => { implicit val transid: TransactionId = tid - activationStore.store(activation)(tid, notifier = None) + activationStore.store(activation, context)(tid, notifier = None) } /** Creates a ContainerProxy Actor when being called. */ @@ -236,10 +237,11 @@ class InvokerReactive( ActivationResponse.whiskError(Messages.actionFetchErrorWhileInvoking) } + val context = UserContext(msg.user) val activation = generateFallbackActivation(msg, response) activationFeed ! MessageFeed.Processed ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex, msg.user.namespace.uuid) - store(msg.transid, activation) + store(msg.transid, activation, context) Future.successful(()) } } else { diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchLogStoreTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchLogStoreTests.scala index 8930a43..0d32b32 100644 --- a/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchLogStoreTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchLogStoreTests.scala @@ -36,6 +36,7 @@ import pureconfig.error.ConfigReaderException import spray.json._ import whisk.core.entity._ import whisk.core.entity.size._ +import whisk.core.database.UserContext import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future, Promise} @@ -88,6 +89,7 @@ class ElasticSearchLogStoreTests HttpEntity(ContentTypes.`application/json`, defaultPayload)) private val defaultLogStoreHttpRequest = HttpRequest(method = GET, uri = "https://some.url", entity = HttpEntity.Empty) + private val defaultContext = UserContext(user, defaultLogStoreHttpRequest) private val expectedLogs = ActivationLogs( Vector("2018-03-05T02:10:38.196689522Z stdout: some log stuff", "2018-03-05T02:10:38.196754258Z stdout: more logs")) @@ -127,7 +129,7 @@ class ElasticSearchLogStoreTests Some(testFlow(defaultHttpResponse, defaultHttpRequest)), elasticSearchConfig = defaultConfig) - await(esLogStore.fetchLogs(user, activation.withoutLogs, defaultLogStoreHttpRequest)) shouldBe expectedLogs + await(esLogStore.fetchLogs(activation.withoutLogs, defaultContext)) shouldBe expectedLogs } it should "get logs from supplied activation record when required headers are not present" in { @@ -137,7 +139,7 @@ class ElasticSearchLogStoreTests Some(testFlow(defaultHttpResponse, defaultHttpRequest)), elasticSearchConfig = defaultConfigRequiredHeaders) - await(esLogStore.fetchLogs(user, activation, defaultLogStoreHttpRequest)) shouldBe expectedLogs + await(esLogStore.fetchLogs(activation, defaultContext)) shouldBe expectedLogs } it should "get user logs from ElasticSearch when required headers are needed" in { @@ -160,8 +162,9 @@ class ElasticSearchLogStoreTests uri = "https://some.url", headers = List(RawHeader("x-auth-token", authToken), RawHeader("x-auth-project-id", authProjectId)), entity = HttpEntity.Empty) + val context = UserContext(user, requiredHeadersHttpRequest) - await(esLogStore.fetchLogs(user, activation.withoutLogs, requiredHeadersHttpRequest)) shouldBe expectedLogs + await(esLogStore.fetchLogs(activation.withoutLogs, context)) shouldBe expectedLogs } it should "dynamically replace $UUID in request path" in { @@ -177,13 +180,13 @@ class ElasticSearchLogStoreTests Some(testFlow(defaultHttpResponse, httpRequest)), elasticSearchConfig = dynamicPathConfig) - await(esLogStore.fetchLogs(user, activation.withoutLogs, defaultLogStoreHttpRequest)) shouldBe expectedLogs + await(esLogStore.fetchLogs(activation.withoutLogs, defaultContext)) shouldBe expectedLogs } it should "fail to connect to invalid host" in { val esLogStore = new ElasticSearchLogStore(system, elasticSearchConfig = defaultConfig) - a[Throwable] should be thrownBy await(esLogStore.fetchLogs(user, activation, defaultLogStoreHttpRequest)) + a[Throwable] should be thrownBy await(esLogStore.fetchLogs(activation, defaultContext)) } it should "forward errors from ElasticSearch" in { @@ -194,7 +197,7 @@ class ElasticSearchLogStoreTests Some(testFlow(httpResponse, defaultHttpRequest)), elasticSearchConfig = defaultConfig) - a[RuntimeException] should be thrownBy await(esLogStore.fetchLogs(user, activation, defaultLogStoreHttpRequest)) + a[RuntimeException] should be thrownBy await(esLogStore.fetchLogs(activation, defaultContext)) } it should "error when configuration protocol is invalid" in { diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala index e2bd8db..dae47e6 100644 --- a/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala @@ -38,6 +38,7 @@ import pureconfig.error.ConfigReaderException import spray.json._ import whisk.core.entity._ import whisk.core.entity.size._ +import whisk.core.database.UserContext import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future, Promise} @@ -88,6 +89,8 @@ class SplunkLogStoreTests annotations = Parameters("limits", ActionLimits(TimeLimit(1.second), MemoryLimit(128.MB), LogLimit(1.MB)).toJson), duration = Some(123)) + val context = UserContext(user, request) + implicit val ec: ExecutionContext = system.dispatcher implicit val materializer: ActorMaterializer = ActorMaterializer() @@ -144,20 +147,20 @@ class SplunkLogStoreTests it should "find logs based on activation timestamps" in { //use the a flow that asserts the request structure and provides a response in the expected format val splunkStore = new SplunkLogStore(system, Some(testFlow), testConfig) - val result = await(splunkStore.fetchLogs(user, activation, request)) + val result = await(splunkStore.fetchLogs(activation, context)) result shouldBe ActivationLogs(Vector("some log message", "some other log message")) } it should "fail to connect to bogus host" in { //use the default http flow with the default bogus-host config val splunkStore = new SplunkLogStore(system, splunkConfig = testConfig) - a[Throwable] should be thrownBy await(splunkStore.fetchLogs(user, activation, request)) + a[Throwable] should be thrownBy await(splunkStore.fetchLogs(activation, context)) } it should "display an error if API cannot be reached" in { //use a flow that generates a 500 response val splunkStore = new SplunkLogStore(system, Some(failFlow), testConfig) - a[RuntimeException] should be thrownBy await(splunkStore.fetchLogs(user, activation, request)) + a[RuntimeException] should be thrownBy await(splunkStore.fetchLogs(activation, context)) } } diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala index e635b5f..a46647a 100644 --- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala @@ -38,6 +38,7 @@ import whisk.core.entity.ExecManifest.{ImageName, RuntimeManifest} import whisk.core.entity._ import whisk.core.entity.size._ import whisk.http.Messages +import whisk.core.database.UserContext import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future, Promise} @@ -163,7 +164,7 @@ class ContainerProxyTests response } - def createStore = LoggedFunction { (transid: TransactionId, activation: WhiskActivation) => + def createStore = LoggedFunction { (transid: TransactionId, activation: WhiskActivation, context: UserContext) => Future.successful(()) } diff --git a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala index 9b54b0a..7bbf820 100644 --- a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala +++ b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala @@ -35,6 +35,7 @@ import whisk.core.entity.size._ import whisk.core.entitlement.Collection import whisk.http.ErrorResponse import whisk.http.Messages +import whisk.core.database.UserContext import java.io.ByteArrayInputStream import java.util.Base64 @@ -59,6 +60,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi { behavior of "Actions API" val creds = WhiskAuthHelpers.newIdentity() + val context = UserContext(creds) val namespace = EntityPath(creds.subject.asString) val collectionPath = s"/${EntityPath.DEFAULT}/${collection.path}" def aname() = MakeName.next("action_tests") @@ -1202,7 +1204,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi { // storing the activation in the db will allow the db polling to retrieve it // the test harness makes sure the activation id observed by the test matches // the one generated by the api handler - storeActivation(activation) + storeActivation(activation, context) try { Post(s"$collectionPath/${action.name}?blocking=true") ~> Route.seal(routes(creds)) ~> check { status should be(OK) @@ -1217,7 +1219,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi { response should be(activation.resultAsJson) } } finally { - deleteActivation(ActivationId(activation.docid.asString)) + deleteActivation(ActivationId(activation.docid.asString), context) } } @@ -1313,7 +1315,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi { // storing the activation in the db will allow the db polling to retrieve it // the test harness makes sure the activation id observed by the test matches // the one generated by the api handler - storeActivation(activation) + storeActivation(activation, context) try { Post(s"$collectionPath/${action.name}?blocking=true") ~> Route.seal(routes(creds)) ~> check { status should be(InternalServerError) @@ -1321,7 +1323,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi { response should be(activation.withoutLogs.toExtendedJson) } } finally { - deleteActivation(ActivationId(activation.docid.asString)) + deleteActivation(ActivationId(activation.docid.asString), context) } } diff --git a/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala index 4b1927e..6810ef2 100644 --- a/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala +++ b/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala @@ -31,6 +31,7 @@ import whisk.core.entitlement.Collection import whisk.core.entity._ import whisk.core.entity.size._ import whisk.http.{ErrorResponse, Messages} +import whisk.core.database.UserContext /** * Tests Activations API. @@ -51,6 +52,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi behavior of "Activations API" val creds = WhiskAuthHelpers.newIdentity() + val context = UserContext(creds) val namespace = EntityPath(creds.subject.asString) val collectionPath = s"/${EntityPath.DEFAULT}/${collection.path}" @@ -93,8 +95,8 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi end = Instant.now) }.toList try { - (notExpectedActivations ++ activations).foreach(storeActivation) - waitOnListActivationsInNamespace(namespace, 2) + (notExpectedActivations ++ activations).foreach(storeActivation(_, context)) + waitOnListActivationsInNamespace(namespace, 2, context) whisk.utils.retry { Get(s"$collectionPath") ~> Route.seal(routes(creds)) ~> check { @@ -135,7 +137,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi } finally { (notExpectedActivations ++ activations).foreach(activation => - deleteActivation(ActivationId(activation.docid.asString))) + deleteActivation(ActivationId(activation.docid.asString), context)) } } @@ -176,8 +178,8 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi }.toList try { - (notExpectedActivations ++ activations).foreach(storeActivation) - waitOnListActivationsInNamespace(namespace, 2) + (notExpectedActivations ++ activations).foreach(storeActivation(_, context)) + waitOnListActivationsInNamespace(namespace, 2, context) checkCount("", 2) whisk.utils.retry { @@ -190,7 +192,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi } } finally { (notExpectedActivations ++ activations).foreach(activation => - deleteActivation(ActivationId(activation.docid.asString))) + deleteActivation(ActivationId(activation.docid.asString), context)) } } @@ -251,8 +253,8 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi end = now.plusSeconds(30))) // should match try { - (notExpectedActivations ++ activations).foreach(storeActivation) - waitOnListActivationsInNamespace(namespace, activations.length) + (notExpectedActivations ++ activations).foreach(storeActivation(_, context)) + waitOnListActivationsInNamespace(namespace, activations.length, context) { // get between two time stamps val filter = s"since=${since.toEpochMilli}&upto=${upto.toEpochMilli}" @@ -306,7 +308,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi } finally { (notExpectedActivations ++ activations).foreach(activation => - deleteActivation(ActivationId(activation.docid.asString))) + deleteActivation(ActivationId(activation.docid.asString), context)) } } @@ -360,9 +362,13 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi annotations = Parameters("path", s"${namespace.asString}/pkg/xyz")) }.toList try { - (notExpectedActivations ++ activations ++ activationsInPackage).foreach(storeActivation) - waitOnListActivationsMatchingName(namespace, EntityPath("xyz"), activations.length) - waitOnListActivationsMatchingName(namespace, EntityName("pkg").addPath(EntityName("xyz")), activations.length) + (notExpectedActivations ++ activations ++ activationsInPackage).foreach(storeActivation(_, context)) + waitOnListActivationsMatchingName(namespace, EntityPath("xyz"), activations.length, context) + waitOnListActivationsMatchingName( + namespace, + EntityName("pkg").addPath(EntityName("xyz")), + activations.length, + context) checkCount("name=xyz", activations.length) whisk.utils.retry { @@ -386,7 +392,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi } } finally { (notExpectedActivations ++ activations ++ activationsInPackage).foreach(activation => - deleteActivation(ActivationId(activation.docid.asString))) + deleteActivation(ActivationId(activation.docid.asString), context)) } } @@ -474,7 +480,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi start = Instant.now, end = Instant.now) try { - storeActivation(activation) + storeActivation(activation, context) Get(s"$collectionPath/${activation.activationId.asString}") ~> Route.seal(routes(creds)) ~> check { status should be(OK) @@ -495,7 +501,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi status should be(Forbidden) } } finally { - deleteActivation(ActivationId(activation.docid.asString)) + deleteActivation(ActivationId(activation.docid.asString), context) } } @@ -511,7 +517,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi start = Instant.now, end = Instant.now) try { - storeActivation(activation) + storeActivation(activation, context) Get(s"$collectionPath/${activation.activationId.asString}/result") ~> Route.seal(routes(creds)) ~> check { status should be(OK) @@ -519,7 +525,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi response should be(activation.response.toExtendedJson) } } finally { - deleteActivation(ActivationId(activation.docid.asString)) + deleteActivation(ActivationId(activation.docid.asString), context) } } @@ -535,7 +541,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi start = Instant.now, end = Instant.now) try { - storeActivation(activation) + storeActivation(activation, context) Get(s"$collectionPath/${activation.activationId.asString}/logs") ~> Route.seal(routes(creds)) ~> check { status should be(OK) @@ -543,7 +549,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi response should be(activation.logs.toJsonObject) } } finally { - deleteActivation(ActivationId(activation.docid.asString)) + deleteActivation(ActivationId(activation.docid.asString), context) } } @@ -558,14 +564,14 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi ActivationId.generate(), start = Instant.now, end = Instant.now) - storeActivation(activation) + storeActivation(activation, context) try { Get(s"$collectionPath/${activation.activationId.asString}/bogus") ~> Route.seal(routes(creds)) ~> check { status should be(NotFound) } } finally { - deleteActivation(ActivationId(activation.docid.asString)) + deleteActivation(ActivationId(activation.docid.asString), context) } } @@ -631,7 +637,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi val activation = new BadActivation(namespace, aname(), creds.subject, ActivationId.generate(), Instant.now, Instant.now) - storeActivation(activation) + storeActivation(activation, context) Get(s"$collectionPath/${activation.activationId}") ~> Route.seal(routes(creds)) ~> check { status should be(InternalServerError) diff --git a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala index b09d1b1..925c47f 100644 --- a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala +++ b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala @@ -42,6 +42,7 @@ import whisk.core.entity._ import whisk.core.entity.test.ExecHelpers import whisk.core.loadBalancer.LoadBalancer import whisk.spi.SpiLoader +import whisk.core.database.UserContext protected trait ControllerTestCommon extends FlatSpec @@ -96,32 +97,34 @@ protected trait ControllerTestCommon }, dbOpTimeout) } - def getActivation(activationId: ActivationId)(implicit transid: TransactionId, - timeout: Duration = 10 seconds): WhiskActivation = { - Await.result(activationStore.get(activationId), timeout) + def getActivation(activationId: ActivationId, context: UserContext)( + implicit transid: TransactionId, + timeout: Duration = 10 seconds): WhiskActivation = { + Await.result(activationStore.get(activationId, context), timeout) } - def storeActivation(activation: WhiskActivation)(implicit transid: TransactionId, - timeout: Duration = 10 seconds): DocInfo = { - val docFuture = activationStore.store(activation) + def storeActivation(activation: WhiskActivation, context: UserContext)(implicit transid: TransactionId, + timeout: Duration = 10 seconds): DocInfo = { + val docFuture = activationStore.store(activation, context) val doc = Await.result(docFuture, timeout) assert(doc != null) doc } - def deleteActivation(activationId: ActivationId)(implicit transid: TransactionId) = { - val res = Await.result(activationStore.delete(activationId), dbOpTimeout) + def deleteActivation(activationId: ActivationId, context: UserContext)(implicit transid: TransactionId) = { + val res = Await.result(activationStore.delete(activationId, context), dbOpTimeout) assert(res, true) res } - def waitOnListActivationsInNamespace(namespace: EntityPath, count: Int)(implicit context: ExecutionContext, - transid: TransactionId, - timeout: Duration) = { + def waitOnListActivationsInNamespace(namespace: EntityPath, count: Int, context: UserContext)( + implicit ec: ExecutionContext, + transid: TransactionId, + timeout: Duration) = { val success = retry( () => { val activations: Future[Either[List[JsObject], List[WhiskActivation]]] = - activationStore.listActivationsInNamespace(namespace, 0, 0) + activationStore.listActivationsInNamespace(namespace, 0, 0, context = context) val listFuture: Future[List[JsObject]] = activations map (_.fold((js) => js, (wa) => wa.map(_.toExtendedJson))) listFuture map { l => @@ -135,14 +138,14 @@ protected trait ControllerTestCommon assert(success.isSuccess, "wait aborted") } - def waitOnListActivationsMatchingName(namespace: EntityPath, name: EntityPath, count: Int)( - implicit context: ExecutionContext, + def waitOnListActivationsMatchingName(namespace: EntityPath, name: EntityPath, count: Int, context: UserContext)( + implicit ex: ExecutionContext, transid: TransactionId, timeout: Duration) = { val success = retry( () => { val activations: Future[Either[List[JsObject], List[WhiskActivation]]] = - activationStore.listActivationsMatchingName(namespace, name, 0, 0) + activationStore.listActivationsMatchingName(namespace, name, 0, 0, context = context) val listFuture: Future[List[JsObject]] = activations map (_.fold((js) => js, (wa) => wa.map(_.toExtendedJson))) listFuture map { l => diff --git a/tests/src/test/scala/whisk/core/controller/test/TriggersApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/TriggersApiTests.scala index cfdda7e..415d224 100644 --- a/tests/src/test/scala/whisk/core/controller/test/TriggersApiTests.scala +++ b/tests/src/test/scala/whisk/core/controller/test/TriggersApiTests.scala @@ -25,7 +25,6 @@ import scala.language.postfixOps import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner -import akka.http.scaladsl.model.StatusCodes._ import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import akka.http.scaladsl.server.Route import akka.http.scaladsl.model.StatusCodes._ @@ -41,6 +40,7 @@ import whisk.core.entity.size._ import whisk.core.entity.test.OldWhiskTrigger import whisk.http.ErrorResponse import whisk.http.Messages +import whisk.core.database.UserContext /** * Tests Trigger API. @@ -61,6 +61,7 @@ class TriggersApiTests extends ControllerTestCommon with WhiskTriggersApi { behavior of "Triggers API" val creds = WhiskAuthHelpers.newIdentity() + val context = UserContext(creds) val namespace = EntityPath(creds.subject.asString) val collectionPath = s"/${EntityPath.DEFAULT}/${collection.path}" def aname() = MakeName.next("triggers_tests") @@ -373,8 +374,8 @@ class TriggersApiTests extends ControllerTestCommon with WhiskTriggersApi { whisk.utils.retry({ println(s"trying to obtain async activation doc: '${activationDoc}'") - val activation = getActivation(ActivationId(activationDoc.asString)) - deleteActivation(ActivationId(activationDoc.asString)) + val activation = getActivation(ActivationId(activationDoc.asString), context) + deleteActivation(ActivationId(activationDoc.asString), context) activation.end should be(Instant.EPOCH) activation.response.result should be(Some(content)) }, 30, Some(1.second)) @@ -396,7 +397,7 @@ class TriggersApiTests extends ControllerTestCommon with WhiskTriggersApi { val activationDoc = DocId(WhiskEntity.qualifiedName(namespace, activationId)) whisk.utils.retry({ println(s"trying to delete async activation doc: '${activationDoc}'") - deleteActivation(ActivationId(activationDoc.asString)) + deleteActivation(ActivationId(activationDoc.asString), context) response.fields("activationId") should not be None }, 30, Some(1.second)) }