This is an automated email from the ASF dual-hosted git repository. cbickel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new b605943 Cleanup view for entities and augment activations view. (#2760) b605943 is described below commit b605943856d82b4253fa4539137e4e106d2fb07b Author: rodric rabbah <rod...@gmail.com> AuthorDate: Thu Nov 9 06:45:40 2017 -0500 Cleanup view for entities and augment activations view. (#2760) - Factor out design doc name to ansible. - Separate index view into its own design doc. This will cut the main view file size in half which will make activation list faster. It replicates the view in a separate design doc, so the total savings are zero. The additional compute overhead for the ddoc though is worth considering. The upshot: we can adjust filters separately. - Add package prefix if it exists to activation filter. - Allow for filtering activations by package name. --- ...ign_document_for_activations_db_filters_v2.json | 9 ++ ...isks_design_document_for_activations_db_v2.json | 9 ++ .../whisks_design_document_for_entities_db_v2.json | 21 +++++ ansible/group_vars/all | 3 + ansible/roles/controller/tasks/deploy.yml | 5 +- ansible/roles/invoker/tasks/deploy.yml | 3 + ansible/tasks/recreateViews.yml | 3 + ansible/templates/whisk.properties.j2 | 5 +- .../src/main/scala/whisk/core/WhiskConfig.scala | 25 ++++-- .../main/scala/whisk/core/entity/EntityPath.scala | 5 ++ .../scala/whisk/core/entity/WhiskActivation.scala | 60 ++++++++++--- .../main/scala/whisk/core/entity/WhiskStore.scala | 51 +++++++++--- .../src/main/scala/whisk/http/ErrorResponse.scala | 3 +- .../scala/whisk/core/controller/Activations.scala | 55 ++++++------ .../main/scala/whisk/core/invoker/Invoker.scala | 15 +++- .../core/controller/test/ActivationsApiTests.scala | 59 +++++++++++-- .../scala/whisk/core/database/test/DbUtils.scala | 11 ++- .../scala/whisk/core/entity/test/ViewTests.scala | 97 +++++++++------------- 18 files changed, 297 insertions(+), 142 deletions(-) diff --git a/ansible/files/whisks_design_document_for_activations_db_filters_v2.json b/ansible/files/whisks_design_document_for_activations_db_filters_v2.json new file mode 100644 index 0000000..14eb0b0 --- /dev/null +++ b/ansible/files/whisks_design_document_for_activations_db_filters_v2.json @@ -0,0 +1,9 @@ +{ + "_id": "_design/whisks-filters.v2", + "language": "javascript", + "views": { + "activations": { + "map": "function (doc) {\n var PATHSEP = \"/\";\n var isActivation = function (doc) { return (doc.activationId !== undefined) };\n var summarize = function (doc) {\n var endtime = doc.end !== 0 ? doc.end : undefined;\n return {\n namespace: doc.namespace,\n name: doc.name,\n version: doc.version,\n publish: doc.publish,\n annotations: doc.annotations,\n activationId: doc.activationId,\n start: doc.start,\n end: endtim [...] + } + } +} diff --git a/ansible/files/whisks_design_document_for_activations_db_v2.json b/ansible/files/whisks_design_document_for_activations_db_v2.json new file mode 100644 index 0000000..be57879 --- /dev/null +++ b/ansible/files/whisks_design_document_for_activations_db_v2.json @@ -0,0 +1,9 @@ +{ + "_id": "_design/whisks.v2", + "language": "javascript", + "views": { + "activations": { + "map": "function (doc) {\n var PATHSEP = \"/\";\n var isActivation = function (doc) { return (doc.activationId !== undefined) };\n var summarize = function (doc) {\n var endtime = doc.end !== 0 ? doc.end : undefined;\n return {\n namespace: doc.namespace,\n name: doc.name,\n version: doc.version,\n publish: doc.publish,\n annotations: doc.annotations,\n activationId: doc.activationId,\n start: doc.start,\n end: endtim [...] + } + } +} diff --git a/ansible/files/whisks_design_document_for_entities_db_v2.json b/ansible/files/whisks_design_document_for_entities_db_v2.json new file mode 100644 index 0000000..97ed91c --- /dev/null +++ b/ansible/files/whisks_design_document_for_entities_db_v2.json @@ -0,0 +1,21 @@ +{ + "_id": "_design/whisks.v2", + "language": "javascript", + "views": { + "rules": { + "map": "function (doc) {\n var PATHSEP = \"/\";\n var isRule = function (doc) { return (doc.trigger !== undefined) };\n if (isRule(doc)) try {\n var ns = doc.namespace.split(PATHSEP);\n var root = ns[0];\n var date = doc.updated;\n var value = {\n namespace: doc.namespace,\n name: doc.name,\n version: doc.version,\n publish: doc.publish,\n annotations: doc.annotations\n };\n emit([doc.namespace, date], value);\n if (root !== doc.nam [...] + }, + "all": { + "map": "function (doc) {\n var PATHSEP = \"/\";\n\n var isPackage = function (doc) { return (doc.binding !== undefined) };\n var isAction = function (doc) { return (doc.exec !== undefined) };\n var isTrigger = function (doc) { return (doc.exec === undefined && doc.binding === undefined && doc.parameters !== undefined) };\n var isRule = function (doc) { return (doc.trigger !== undefined) };\n \n var collection = function (doc) {\n if (isPackage(doc)) return \"packages\"; [...] + }, + "packages": { + "map": "function (doc) {\n var isPackage = function (doc) { return (doc.binding !== undefined) };\n if (isPackage(doc)) try {\n var date = doc.updated;\n emit([doc.namespace, date], {\n namespace: doc.namespace,\n name: doc.name,\n version: doc.version,\n publish: doc.publish,\n annotations: doc.annotations,\n binding: Object.keys(doc.binding).length !== 0\n });\n } catch (e) {}\n}" + }, + "actions": { + "map": "function (doc) {\n var PATHSEP = \"/\";\n var isAction = function (doc) { return (doc.exec !== undefined) };\n if (isAction(doc)) try {\n var ns = doc.namespace.split(PATHSEP);\n var root = ns[0];\n var date = doc.updated;\n var value = {\n namespace: doc.namespace,\n name: doc.name,\n version: doc.version,\n publish: doc.publish,\n annotations: doc.annotations\n };\n emit([doc.namespace, date], value);\n if (root !== doc.nam [...] + }, + "triggers": { + "map": "function (doc) {\n var PATHSEP = \"/\";\n var isTrigger = function (doc) { return (doc.exec === undefined && doc.binding === undefined && doc.parameters !== undefined) };\n if (isTrigger(doc)) try {\n var ns = doc.namespace.split(PATHSEP);\n var root = ns[0];\n var date = doc.updated;\n var value = {\n namespace: doc.namespace,\n name: doc.name,\n version: doc.version,\n publish: doc.publish,\n annotations: doc.annotations\n };\n [...] + } + } +} \ No newline at end of file diff --git a/ansible/group_vars/all b/ansible/group_vars/all index d7dfae9..79642fe 100644 --- a/ansible/group_vars/all +++ b/ansible/group_vars/all @@ -198,7 +198,10 @@ db: - whisk.system whisk: actions: "{{ db_prefix }}whisks" + actions_ddoc: "whisks" activations: "{{ db_prefix }}activations" + activations_ddoc: "whisks" + activations_filter_ddoc: "whisks" auth: "{{ db_prefix }}subjects" apigateway: diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml index d128053..c669395 100644 --- a/ansible/roles/controller/tasks/deploy.yml +++ b/ansible/roles/controller/tasks/deploy.yml @@ -60,9 +60,12 @@ "DB_PORT": "{{ db_port }}" "DB_USERNAME": "{{ db_username }}" "DB_PASSWORD": "{{ db_password }}" - "DB_WHISK_ACTIONS": "{{ db.whisk.actions }}" "DB_WHISK_AUTHS": "{{ db.whisk.auth }}" + "DB_WHISK_ACTIONS": "{{ db.whisk.actions }}" "DB_WHISK_ACTIVATIONS": "{{ db.whisk.activations }}" + "DB_WHISK_ACTIONS_DDOC": "{{ db.whisk.actions_ddoc }}" + "DB_WHISK_ACTIVATIONS_DDOC": "{{ db.whisk.activations_ddoc }}" + "DB_WHISK_ACTIVATIONS_FILTER_DDOC": "{{ db.whisk.activations_filter_ddoc }}" "LIMITS_ACTIONS_INVOKES_PERMINUTE": "{{ limits.invocationsPerMinute }}" "LIMITS_ACTIONS_INVOKES_CONCURRENT": "{{ limits.concurrentInvocations }}" diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml index 2b8dc7c..b7d9fc2 100644 --- a/ansible/roles/invoker/tasks/deploy.yml +++ b/ansible/roles/invoker/tasks/deploy.yml @@ -129,6 +129,9 @@ -e DB_PASSWORD='{{ db_password }}' -e DB_WHISK_ACTIONS='{{ db.whisk.actions }}' -e DB_WHISK_ACTIVATIONS='{{ db.whisk.activations }}' + -e DB_WHISK_ACTIONS_DDOC='{{ db.whisk.actions_ddoc }}' + -e DB_WHISK_ACTIVATIONS_DDOC='{{ db.whisk.activations_ddoc }}' + -e DB_WHISK_ACTIVATIONS_FILTER_DDOC='{{ db.whisk.activations_filter_ddoc }}' -e WHISK_API_HOST_PROTO='{{ whisk_api_host_proto | default('https') }}' -e WHISK_API_HOST_PORT='{{ whisk_api_host_port | default('443') }}' -e WHISK_API_HOST_NAME='{{ whisk_api_host_name | default(groups['edge'] | first) }}' diff --git a/ansible/tasks/recreateViews.yml b/ansible/tasks/recreateViews.yml index e953320..682ca94 100644 --- a/ansible/tasks/recreateViews.yml +++ b/ansible/tasks/recreateViews.yml @@ -7,6 +7,7 @@ doc: "{{ lookup('file', '{{ item }}') }}" with_items: - "{{ openwhisk_home }}/ansible/files/whisks_design_document_for_actions_db.json" + - "{{ openwhisk_home }}/ansible/files/whisks_design_document_for_entities_db_v2.json" - "{{ openwhisk_home }}/ansible/files/filter_design_document.json" - include: db/recreateDoc.yml @@ -15,6 +16,8 @@ doc: "{{ lookup('file', '{{ item }}') }}" with_items: - "{{ openwhisk_home }}/ansible/files/whisks_design_document_for_activations_db.json" + - "{{ openwhisk_home }}/ansible/files/whisks_design_document_for_activations_db_v2.json" + - "{{ openwhisk_home }}/ansible/files/whisks_design_document_for_activations_db_filters_v2.json" - "{{ openwhisk_home }}/ansible/files/filter_design_document.json" - "{{ openwhisk_home }}/ansible/files/activations_design_document_for_activations_db.json" - "{{ openwhisk_home }}/ansible/files/logCleanup_design_document_for_activations_db.json" diff --git a/ansible/templates/whisk.properties.j2 b/ansible/templates/whisk.properties.j2 index e23e624..2c1d644 100644 --- a/ansible/templates/whisk.properties.j2 +++ b/ansible/templates/whisk.properties.j2 @@ -87,9 +87,12 @@ db.port={{ db_port }} db.username={{ db_username }} db.password={{ db_password }} db.prefix={{ db_prefix }} +db.whisk.auths={{ db.whisk.auth }} db.whisk.actions={{ db.whisk.actions }} db.whisk.activations={{ db.whisk.activations }} -db.whisk.auths={{ db.whisk.auth }} +db.whisk.actions.ddoc={{ db.whisk.actions_ddoc }} +db.whisk.activations.ddoc={{ db.whisk.activations_ddoc }} +db.whisk.activations.filter.ddoc={{ db.whisk.activations_filter_ddoc }} apigw.auth.user={{apigw_auth_user}} apigw.auth.pwd={{apigw_auth_pwd}} diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala index 56da847..726d1bc 100644 --- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala +++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala @@ -30,12 +30,13 @@ import whisk.common.{Config, Logging} * a value, and whose values are default values. A null value in the Map means there is * no default value specified, so it must appear in the properties file. * @param optionalProperties a set of optional properties (which may not be defined). - * @param whiskPropertiesFile a File object, the whisk.properties file, which if given contains the property values. + * @param propertiesFile a File object, the whisk.properties file, which if given contains the property values. + * @param env an optional environment to initialize from. */ class WhiskConfig(requiredProperties: Map[String, String], optionalProperties: Set[String] = Set(), propertiesFile: File = null, - env: Map[String, String] = sys.env)(implicit val logging: Logging) + env: Map[String, String] = sys.env)(implicit logging: Logging) extends Config(requiredProperties, optionalProperties)(env) { /** @@ -89,10 +90,10 @@ class WhiskConfig(requiredProperties: Map[String, String], val dbProtocol = this(WhiskConfig.dbProtocol) val dbHost = this(WhiskConfig.dbHost) val dbPort = this(WhiskConfig.dbPort) - val dbWhisk = this(WhiskConfig.dbWhisk) + val dbPrefix = this(WhiskConfig.dbPrefix) val dbAuths = this(WhiskConfig.dbAuths) + val dbWhisk = this(WhiskConfig.dbWhisk) val dbActivations = this(WhiskConfig.dbActivations) - val dbPrefix = this(WhiskConfig.dbPrefix) val mainDockerEndpoint = this(WhiskConfig.mainDockerEndpoint) @@ -104,11 +105,15 @@ class WhiskConfig(requiredProperties: Map[String, String], val actionSequenceLimit = this(WhiskConfig.actionSequenceMaxLimit) val controllerSeedNodes = this(WhiskConfig.controllerSeedNodes) val controllerLocalBookkeeping = getAsBoolean(WhiskConfig.controllerLocalBookkeeping, false) - } object WhiskConfig { + /** + * Reads a key from system environment as if it was part of WhiskConfig. + */ + def readFromEnv(key: String): Option[String] = sys.env.get(asEnvVar(key)) + private def whiskPropertiesFile: File = { def propfile(dir: String, recurse: Boolean = false): File = if (dir != null) { @@ -153,10 +158,11 @@ object WhiskConfig { } } - def asEnvVar(key: String): String = + def asEnvVar(key: String): String = { if (key != null) key.replace('.', '_').toUpperCase else null + } val servicePort = "port" val dockerRegistry = "docker.registry" @@ -170,10 +176,13 @@ object WhiskConfig { val dbPort = "db.port" val dbUsername = "db.username" val dbPassword = "db.password" - val dbWhisk = "db.whisk.actions" - val dbAuths = "db.whisk.auths" val dbPrefix = "db.prefix" + val dbAuths = "db.whisk.auths" + val dbWhisk = "db.whisk.actions" val dbActivations = "db.whisk.activations" + val dbWhiskDesignDoc = "db.whisk.actions.ddoc" + val dbActivationsDesignDoc = "db.whisk.activations.ddoc" + val dbActivationsFilterDesignDoc = "db.whisk.activations.filter.ddoc" // these are not private because they are needed // in the invoker (they are part of the environment diff --git a/common/scala/src/main/scala/whisk/core/entity/EntityPath.scala b/common/scala/src/main/scala/whisk/core/entity/EntityPath.scala index 26e1fb1..c4836c3 100644 --- a/common/scala/src/main/scala/whisk/core/entity/EntityPath.scala +++ b/common/scala/src/main/scala/whisk/core/entity/EntityPath.scala @@ -43,6 +43,11 @@ protected[core] class EntityPath private (private val path: Seq[String]) extends def namespace: String = path.foldLeft("")((a, b) => if (a != "") a.trim + EntityPath.PATHSEP + b.trim else b.trim) /** + * @return number of parts in the path. + */ + def segments = path.length + + /** * Adds segment to path. */ def addPath(e: EntityName) = EntityPath(path :+ e.name) diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala index 3dca9c1..4073093 100644 --- a/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala +++ b/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala @@ -23,12 +23,13 @@ import scala.concurrent.Future import scala.util.Try import spray.json._ -import spray.json.DefaultJsonProtocol import spray.json.DefaultJsonProtocol._ import whisk.common.TransactionId import whisk.core.database.ArtifactStore import whisk.core.database.DocumentFactory import whisk.core.database.StaleParameter +import whisk.core.WhiskConfig +import whisk.core.WhiskConfig.{dbActivationsDesignDoc, dbActivationsFilterDesignDoc} /** * A WhiskActivation provides an abstraction of the meta-data @@ -77,8 +78,25 @@ case class WhiskActivation(namespace: EntityPath, /** This the activation summary as computed by the database view. Strictly used for testing. */ override def summaryAsJson = { - val JsObject(fields) = super.summaryAsJson - JsObject(fields + ("activationId" -> activationId.toJson)) + import WhiskActivation.instantSerdes + val summary = JsObject(super.summaryAsJson.fields + ("activationId" -> activationId.toJson)) + + def actionOrNot() = { + if (end != Instant.EPOCH) { + Map( + "end" -> end.toJson, + "duration" -> (duration getOrElse (end.toEpochMilli - start.toEpochMilli)).toJson, + "statusCode" -> response.statusCode.toJson) + } else Map.empty + } + + if (WhiskActivation.mainDdoc.endsWith(".v2")) { + JsObject( + summary.fields + + ("start" -> start.toJson) ++ + cause.map(("cause" -> _.toJson)) ++ + actionOrNot()) + } else summary } def resultAsJson = response.result.toJson.asJsObject @@ -94,8 +112,10 @@ case class WhiskActivation(namespace: EntityPath, } } - def withoutLogsOrResult = + def withoutLogsOrResult = { copy(response = response.withoutResult, logs = ActivationLogs()).revision[WhiskActivation](rev) + } + def withoutLogs = copy(logs = ActivationLogs()).revision[WhiskActivation](rev) def withLogs(logs: ActivationLogs) = copy(logs = logs).revision[WhiskActivation](rev) } @@ -119,6 +139,24 @@ object WhiskActivation } override val collectionName = "activations" + + // FIXME: reading the design doc from sys.env instead of a canonical property reader + // because WhiskConfig requires a logger, which requires an actor system, neither of + // which are readily available here; rather than introduce significant refactoring, + // defer this fix until WhiskConfig is refactored itself, which is planned to introduce + // type safe properties + private val mainDdoc = WhiskConfig.readFromEnv(dbActivationsDesignDoc).getOrElse("whisks") + private val filtersDdoc = WhiskConfig.readFromEnv(dbActivationsFilterDesignDoc).getOrElse("whisks") + + /** The main view for activations, keyed by namespace, sorted by date. */ + override lazy val view = WhiskEntityQueries.view(mainDdoc, collectionName) + + /** + * A view for activations in a namespace additionally keyed by action name + * (and package name if present) sorted by date. + */ + private val filtersView = WhiskEntityQueries.view(filtersDdoc, collectionName) + override implicit val serdes = jsonFormat13(WhiskActivation.apply) // Caching activations doesn't make much sense in the common case as usually, @@ -134,18 +172,18 @@ object WhiskActivation */ def listActivationsMatchingName(db: ArtifactStore[WhiskActivation], namespace: EntityPath, - name: EntityName, + path: EntityPath, skip: Int, limit: Int, - docs: Boolean = false, + includeDocs: Boolean = false, since: Option[Instant] = None, upto: Option[Instant] = None, stale: StaleParameter = StaleParameter.No)( implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]] = { - import WhiskEntityQueries._ - val convert = if (docs) Some((o: JsObject) => Try { serdes.read(o) }) else None - val startKey = List(namespace.addPath(name).asString, since map { _.toEpochMilli } getOrElse 0) - val endKey = List(namespace.addPath(name).asString, upto map { _.toEpochMilli } getOrElse TOP, TOP) - query(db, viewname(collectionName), startKey, endKey, skip, limit, reduce = false, stale, convert) + import WhiskEntityQueries.TOP + val convert = if (includeDocs) Some((o: JsObject) => Try { serdes.read(o) }) else None + val startKey = List(namespace.addPath(path).asString, since map { _.toEpochMilli } getOrElse 0) + val endKey = List(namespace.addPath(path).asString, upto map { _.toEpochMilli } getOrElse TOP, TOP) + query(db, filtersView, startKey, endKey, skip, limit, reduce = false, stale, convert) } } diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala index df8d1fe..28addc2 100644 --- a/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala +++ b/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala @@ -22,7 +22,6 @@ import java.time.Instant import scala.concurrent.Future import scala.language.postfixOps import scala.util.Try - import akka.actor.ActorSystem import akka.stream.ActorMaterializer import spray.json.JsObject @@ -40,6 +39,8 @@ import whisk.core.WhiskConfig.dbProtocol import whisk.core.WhiskConfig.dbProvider import whisk.core.WhiskConfig.dbUsername import whisk.core.WhiskConfig.dbWhisk +import whisk.core.WhiskConfig.dbWhiskDesignDoc +import whisk.core.WhiskConfig.dbActivationsDesignDoc import whisk.core.database.ArtifactStore import whisk.core.database.ArtifactStoreProvider import whisk.core.database.DocumentRevisionProvider @@ -111,7 +112,8 @@ object WhiskEntityStore { dbPassword -> null, dbHost -> null, dbPort -> null, - dbWhisk -> null) + dbWhisk -> null, + dbWhiskDesignDoc -> null) def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging, materializer: ActorMaterializer) = SpiLoader @@ -129,13 +131,26 @@ object WhiskActivationStore { dbPassword -> null, dbHost -> null, dbPort -> null, - dbActivations -> null) + dbActivations -> null, + dbActivationsDesignDoc -> null) def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging, materializer: ActorMaterializer) = SpiLoader.get[ArtifactStoreProvider].makeStore[WhiskActivation](config, _.dbActivations, true) } /** + * A class to type the design doc and view within a database. + * + * @param ddoc the design document + * @param view the view name within the design doc + */ +protected[core] class View(ddoc: String, view: String) { + + /** The name of the table to query. */ + val name = s"$ddoc/$view" +} + +/** * This object provides some utilities that query the whisk datastore. * The datastore is assumed to have views (pre-computed joins or indexes) * for each of the whisk collection types. Entities may be queries by @@ -169,14 +184,23 @@ object WhiskActivationStore { */ object WhiskEntityQueries { val TOP = "\ufff0" - val WHISKVIEW = "whisks" - val ALL = "all" + + /** The design document to use for queries. */ + // FIXME: reading the design doc from sys.env instead of a canonical property reader + // because WhiskConfig requires a logger, which requires an actor system, neither of + // which are readily available here; rather than introduce significant refactoring, + // defer this fix until WhiskConfig is refactored itself, which is planned to introduce + // type safe properties + val designDoc = WhiskConfig.readFromEnv(dbWhiskDesignDoc).getOrElse("whisks") + + /** The view name for the collection, within the design document. */ + def view(ddoc: String = designDoc, collection: String) = new View(ddoc, collection) /** - * Determines the view name for the collection. There are two cases: a view - * that is namespace specific, or namespace agnostic.. + * Name of view in design-doc that lists all entities in that views regardless of types. + * This is uses in the namespace API, and also in tests to check preconditions. */ - def viewname(collection: String): String = s"$WHISKVIEW/$collection" + val viewAll: View = view(collection = "all") /** * Queries the datastore for all entities in a namespace, and converts the list of entities @@ -191,7 +215,7 @@ object WhiskEntityQueries { implicit val ec = db.executionContext val startKey = List(namespace.asString) val endKey = List(namespace.asString, TOP) - db.query(viewname(ALL), startKey, endKey, 0, 0, includeDocs, descending = true, reduce = false, stale = stale) map { + db.query(viewAll.name, startKey, endKey, 0, 0, includeDocs, descending = true, reduce = false, stale = stale) map { _ map { row => val value = row.fields("value").asJsObject val JsString(collection) = value.fields("collection") @@ -206,6 +230,9 @@ trait WhiskEntityQueries[T] { val serdes: RootJsonFormat[T] import WhiskEntityQueries._ + /** The view name for the collection, within the design document. */ + lazy val view: View = WhiskEntityQueries.view(collection = collectionName) + /** * Queries the datastore for records from a specific collection (i.e., type) matching * the given path (which should be one namespace, or namespace + package name). @@ -225,12 +252,12 @@ trait WhiskEntityQueries[T] { val convert = if (includeDocs) Some((o: JsObject) => Try { serdes.read(o) }) else None val startKey = List(path.asString, since map { _.toEpochMilli } getOrElse 0) val endKey = List(path.asString, upto map { _.toEpochMilli } getOrElse TOP, TOP) - query(db, viewname(collectionName), startKey, endKey, skip, limit, reduce = false, stale, convert) + query(db, view, startKey, endKey, skip, limit, reduce = false, stale, convert) } protected[entity] def query[A <: WhiskEntity]( db: ArtifactStore[A], - view: String, + view: View, startKey: List[Any], endKey: List[Any], skip: Int, @@ -240,7 +267,7 @@ trait WhiskEntityQueries[T] { convert: Option[JsObject => Try[T]])(implicit transid: TransactionId): Future[Either[List[JsObject], List[T]]] = { implicit val ec = db.executionContext val includeDocs = convert.isDefined - db.query(view, startKey, endKey, skip, limit, includeDocs, true, reduce, stale) map { rows => + db.query(view.name, startKey, endKey, skip, limit, includeDocs, true, reduce, stale) map { rows => convert map { fn => Right(rows flatMap { row => fn(row.fields("doc").asJsObject) toOption diff --git a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala index 09c3bfe..719a6c5 100644 --- a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala +++ b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala @@ -113,8 +113,7 @@ object Messages { val abnormalInitialization = "The action did not initialize and exited unexpectedly." val abnormalRun = "The action did not produce a valid response and exited unexpectedly." val memoryExhausted = "The action exhausted its memory and was aborted." - def badEntityName(value: String) = s"Parameter is not a valid value for a entity name: $value" - def badNamespace(value: String) = s"Parameter is not a valid value for a namespace: $value" + def badNameFilter(value: String) = s"Parameter may be a 'simple' name or 'package-name/simple' name: $value" def badEpoch(value: String) = s"Parameter is not a valid value for epoch seconds: $value" /** Error message for size conformance. */ diff --git a/core/controller/src/main/scala/whisk/core/controller/Activations.scala b/core/controller/src/main/scala/whisk/core/controller/Activations.scala index 0d7dbbc..0403d1c 100644 --- a/core/controller/src/main/scala/whisk/core/controller/Activations.scala +++ b/core/controller/src/main/scala/whisk/core/controller/Activations.scala @@ -43,6 +43,25 @@ import whisk.http.Messages object WhiskActivationsApi { protected[core] val maxActivationLimit = 200 + + /** Custom unmarshaller for query parameters "name" into valid package/action name path. */ + private implicit val stringToRestrictedEntityPath: Unmarshaller[String, Option[EntityPath]] = + Unmarshaller.strict[String, Option[EntityPath]] { value => + Try { EntityPath(value) } match { + case Success(e) if e.segments <= 2 => Some(e) + case _ if value.trim.isEmpty => None + case _ => throw new IllegalArgumentException(Messages.badNameFilter(value)) + } + } + + /** Custom unmarshaller for query parameters "since" and "upto" into a valid Instant. */ + private implicit val stringToInstantDeserializer: Unmarshaller[String, Instant] = + Unmarshaller.strict[String, Instant] { value => + Try { Instant.ofEpochMilli(value.toLong) } match { + case Success(e) => e + case Failure(t) => throw new IllegalArgumentException(Messages.badEpoch(value)) + } + } } /** A trait implementing the activations API. */ @@ -118,24 +137,27 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit * - 500 Internal Server Error */ private def list(namespace: EntityPath)(implicit transid: TransactionId) = { + import WhiskActivationsApi.stringToRestrictedEntityPath + import WhiskActivationsApi.stringToInstantDeserializer + parameter( 'skip ? 0, 'limit ? collection.listLimit, 'count ? false, 'docs ? false, - 'name.as[EntityName] ?, + 'name.as[Option[EntityPath]] ?, 'since.as[Instant] ?, 'upto.as[Instant] ?) { (skip, limit, count, docs, name, since, upto) => val cappedLimit = if (limit == 0) WhiskActivationsApi.maxActivationLimit else limit // regardless of limit, cap at maxActivationLimit (200) records, client must paginate if (cappedLimit <= WhiskActivationsApi.maxActivationLimit) { - val activations = name match { + val activations = name.flatten match { case Some(action) => WhiskActivation.listActivationsMatchingName( activationStore, namespace, - action, + action.last.toPath, skip, cappedLimit, docs, @@ -215,31 +237,4 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit docid, (activation: WhiskActivation) => logStore.fetchLogs(activation).map(_.toJsonObject)) } - - /** Custom unmarshaller for query parameters "name" into valid entity name. */ - private implicit val stringToEntityName: Unmarshaller[String, EntityName] = - Unmarshaller.strict[String, EntityName] { value => - Try { EntityName(value) } match { - case Success(e) => e - case Failure(t) => throw new IllegalArgumentException(Messages.badEntityName(value)) - } - } - - /** Custom unmarshaller for query parameters "name" into valid namespace. */ - private implicit val stringToNamespace: Unmarshaller[String, EntityPath] = - Unmarshaller.strict[String, EntityPath] { value => - Try { EntityPath(value) } match { - case Success(e) => e - case Failure(t) => throw new IllegalArgumentException(Messages.badNamespace(value)) - } - } - - /** Custom unmarshaller for query parameters "since" and "upto" into a valid Instant. */ - private implicit val stringToInstantDeserializer: Unmarshaller[String, Instant] = - Unmarshaller.strict[String, Instant] { value => - Try { Instant.ofEpochMilli(value.toLong) } match { - case Success(e) => e - case Failure(t) => throw new IllegalArgumentException(Messages.badEpoch(value)) - } - } } diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala index 8c57206..192100d 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala @@ -137,19 +137,22 @@ object Invoker { if (invokerName.trim.isEmpty) { abort("Invoker name can't be empty to use dynamicId assignment.") } + logger.info(this, s"invokerReg: creating zkClient to ${config.zookeeperHost}") val retryPolicy = new RetryUntilElapsed(5000, 500) // retry at 500ms intervals until 5 seconds have elapsed val zkClient = CuratorFrameworkFactory.newClient(config.zookeeperHost, retryPolicy) zkClient.start() - zkClient.blockUntilConnected(); + zkClient.blockUntilConnected() logger.info(this, "invokerReg: connected to zookeeper") + val myIdPath = "/invokers/idAssignment/mapping/" + invokerName val assignedId = Option(zkClient.checkExists().forPath(myIdPath)) match { case None => - // path doesn't exist ==> no previous mapping for this invoker + // path doesn't exist -> no previous mapping for this invoker logger.info(this, s"invokerReg: no prior assignment of id for invoker $invokerName") val idCounter = new SharedCount(zkClient, "/invokers/idAssignment/counter", 0) idCounter.start() + def assignId(): Int = { val current = idCounter.getVersionedValue() if (idCounter.trySetCount(current, current.getValue() + 1)) { @@ -158,22 +161,26 @@ object Invoker { assignId() } } + val newId = assignId() idCounter.close() zkClient.create().creatingParentContainersIfNeeded().forPath(myIdPath, BigInt(newId).toByteArray) logger.info(this, s"invokerReg: invoker ${invokerName} was assigned invokerId ${newId}") newId + case Some(_) => - // path already exists ==> there is a previous mapping for this invoker we should use + // path already exists -> there is a previous mapping for this invoker we should use val rawOldId = zkClient.getData().forPath(myIdPath) val oldId = BigInt(rawOldId).intValue logger.info(this, s"invokerReg: invoker ${invokerName} was assigned its previous invokerId ${oldId}") oldId } + zkClient.close() assignedId } - val invokerInstance = InstanceId(assignedInvokerId); + + val invokerInstance = InstanceId(assignedInvokerId) val msgProvider = SpiLoader.get[MessagingProvider] val producer = msgProvider.getProducer(config, ec) val invoker = try { diff --git a/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala index 548c612..9f99eda 100644 --- a/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala +++ b/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala @@ -80,7 +80,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi WhiskActivation(namespace, actionName, creds.subject, ActivationId(), start = Instant.now, end = Instant.now) }.toList activations foreach { put(activationStore, _) } - waitOnView(activationStore, namespace.root, 2, WhiskActivation.collectionName) + waitOnView(activationStore, namespace.root, 2, WhiskActivation.view) whisk.utils.retry { Get(s"$collectionPath") ~> Route.seal(routes(creds)) ~> check { status should be(OK) @@ -152,7 +152,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi response = ActivationResponse.success(Some(JsNumber(5)))) }.toList activations foreach { put(activationStore, _) } - waitOnView(activationStore, namespace.root, 2, WhiskActivation.collectionName) + waitOnView(activationStore, namespace.root, 2, WhiskActivation.view) whisk.utils.retry { Get(s"$collectionPath?docs=true") ~> Route.seal(routes(creds)) ~> check { @@ -216,7 +216,7 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi start = now.plusSeconds(30), end = now.plusSeconds(20))) // should match activations foreach { put(activationStore, _) } - waitOnView(activationStore, namespace.root, activations.length, WhiskActivation.collectionName) + waitOnView(activationStore, namespace.root, activations.length, WhiskActivation.view) // get between two time stamps whisk.utils.retry { @@ -267,6 +267,20 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi } //// GET /activations?name=xyz + it should "accept valid name parameters and reject invalid ones" in { + implicit val tid = transid() + + Seq(("", OK), ("name=", OK), ("name=abc", OK), ("name=abc/xyz", OK), ("name=abc/xyz/123", BadRequest)).foreach { + case (p, s) => + Get(s"$collectionPath?$p") ~> Route.seal(routes(creds)) ~> check { + status should be(s) + if (s == BadRequest) { + responseAs[String] should include(Messages.badNameFilter(p.drop(5))) + } + } + } + } + it should "get summary activation by namespace and action name" in { implicit val tid = transid() @@ -292,18 +306,46 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi end = Instant.now) }.toList activations foreach { put(activationStore, _) } - waitOnView(activationStore, namespace.root, 2, WhiskActivation.collectionName) + + val activationsInPackage = (1 to 2).map { i => + WhiskActivation( + namespace, + EntityName(s"xyz"), + creds.subject, + ActivationId(), + start = Instant.now, + end = Instant.now, + annotations = Parameters("path", s"${namespace.asString}/pkg/xyz")) + }.toList + activationsInPackage foreach { put(activationStore, _) } + + waitOnView(activationStore, namespace.root, 4, WhiskActivation.view) whisk.utils.retry { Get(s"$collectionPath?name=xyz") ~> Route.seal(routes(creds)) ~> check { status should be(OK) val response = responseAs[List[JsObject]] - activations.length should be(response.length) - activations forall { a => + val allActivations = activations ++ activationsInPackage + allActivations.length should be(response.length) + allActivations forall { a => response contains a.summaryAsJson } should be(true) } } + + // this is not yet ready, the v2 views must be activated + if (false) { + whisk.utils.retry { + Get(s"$collectionPath?name=pkg/xyz") ~> Route.seal(routes(creds)) ~> check { + status should be(OK) + val response = responseAs[List[JsObject]] + activationsInPackage.length should be(response.length) + activationsInPackage forall { a => + response contains a.summaryAsJson + } should be(true) + } + } + } } it should "reject activation list when limit is greater than maximum allowed value" in { @@ -311,8 +353,9 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi val exceededMaxLimit = WhiskActivationsApi.maxActivationLimit + 1 val response = Get(s"$collectionPath?limit=$exceededMaxLimit") ~> Route.seal(routes(creds)) ~> check { val response = responseAs[String] - response should include( - Messages.maxActivationLimitExceeded(exceededMaxLimit, WhiskActivationsApi.maxActivationLimit)) + response should include { + Messages.maxActivationLimitExceeded(exceededMaxLimit, WhiskActivationsApi.maxActivationLimit) + } status should be(BadRequest) } } diff --git a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala index a01e04f..3c77f4e 100644 --- a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala +++ b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala @@ -88,7 +88,7 @@ trait DbUtils extends TransactionCounter { * where the step performs a direct db query to retrieve the view and check the count * matches the given value. */ - def waitOnView[Au](db: ArtifactStore[Au], namespace: EntityName, count: Int, viewName: String)( + def waitOnView[Au](db: ArtifactStore[Au], namespace: EntityName, count: Int, view: View)( implicit context: ExecutionContext, transid: TransactionId, timeout: Duration) = { @@ -96,11 +96,10 @@ trait DbUtils extends TransactionCounter { () => { val startKey = List(namespace.asString) val endKey = List(namespace.asString, WhiskEntityQueries.TOP) - db.query(WhiskEntityQueries.viewname(viewName), startKey, endKey, 0, 0, false, true, false, StaleParameter.No) map { - l => - if (l.length != count) { - throw RetryOp() - } else true + db.query(view.name, startKey, endKey, 0, 0, false, true, false, StaleParameter.No) map { l => + if (l.length != count) { + throw RetryOp() + } else true } }, timeout) diff --git a/tests/src/test/scala/whisk/core/entity/test/ViewTests.scala b/tests/src/test/scala/whisk/core/entity/test/ViewTests.scala index 2347170..55b2e69 100644 --- a/tests/src/test/scala/whisk/core/entity/test/ViewTests.scala +++ b/tests/src/test/scala/whisk/core/entity/test/ViewTests.scala @@ -94,7 +94,7 @@ class ViewTests val result = Await.result(listAllInNamespace(store, ns.root, false, StaleParameter.No), dbOpTimeout).values.toList.flatten val expected = entities filter { _.namespace.root.toPath == ns } - result.length should be(expected.length) + result should have length expected.length result should contain theSameElementsAs expected.map(_.summaryAsJson) } @@ -108,7 +108,7 @@ class ViewTests .get .map(e => e) val expected = entities filter { _.namespace.root.toPath == ns } - result.length should be(expected.length) + result should have length expected.length result should contain theSameElementsAs expected.map(_.summaryAsJson) } @@ -120,10 +120,8 @@ class ViewTests } val expected = entities filter { !_.isInstanceOf[WhiskActivation] } filter { _.namespace.root.toPath == ns } map.get(WhiskActivation.collectionName) should be(None) - result.length should be(expected.length) - expected forall { e => - result contains e.summaryAsJson - } should be(true) + result should have length expected.length + result should contain theSameElementsAs expected.map(_.summaryAsJson) } def resolveListMethodForKind(kind: String) = kind match { @@ -140,16 +138,13 @@ class ViewTests f: (WhiskEntity) => Boolean)(implicit entities: Seq[WhiskEntity]) = { implicit val tid = transid() val q = resolveListMethodForKind(kind) - val result = Await.result(q.listCollectionInNamespace(store, ns, 0, 0, stale = StaleParameter.No) map { - _.left.get.map(e => e) - }, dbOpTimeout) + val result = + Await.result(q.listCollectionInNamespace(store, ns, 0, 0, stale = StaleParameter.No).map(_.left.get), dbOpTimeout) val expected = entities filter { e => f(e) && e.namespace.root.toPath == ns } - result.length should be(expected.length) - expected forall { e => - result contains e.summaryAsJson - } should be(true) + result should have length expected.length + result should contain theSameElementsAs expected.map(_.summaryAsJson) } def getKindInNamespaceWithDoc[T](ns: EntityPath, kind: String, f: (WhiskEntity) => Boolean)( @@ -158,17 +153,14 @@ class ViewTests val q = resolveListMethodForKind(kind) val result = Await.result( - q.listCollectionInNamespace(entityStore, ns, 0, 0, includeDocs = true, stale = StaleParameter.No) map { - _.right.get - }, + q.listCollectionInNamespace(entityStore, ns, 0, 0, includeDocs = true, stale = StaleParameter.No) + .map(_.right.get), dbOpTimeout) val expected = entities filter { e => f(e) && e.namespace.root.toPath == ns } - result.length should be(expected.length) - expected forall { e => - result contains e - } should be(true) + result should have length expected.length + result should contain theSameElementsAs expected } def getKindInNamespaceByName[Au <: WhiskEntity](store: ArtifactStore[Au], @@ -180,18 +172,14 @@ class ViewTests implicit val tid = transid() val q = resolveListMethodForKind(kind) val result = - Await.result(q.listCollectionInNamespace(store, ns.addPath(name), 0, 0, stale = StaleParameter.No) map { - _.left.get map { e => - e - } - }, dbOpTimeout) + Await.result( + q.listCollectionInNamespace(store, ns.addPath(name), 0, 0, stale = StaleParameter.No).map(_.left.get), + dbOpTimeout) val expected = entities filter { e => f(e) && e.namespace.root.toPath == ns } - result.length should be(expected.length) - expected forall { e => - result contains e.summaryAsJson - } should be(true) + result should have length expected.length + result should contain theSameElementsAs expected.map(_.summaryAsJson) } def getActivationsInNamespaceByName(store: ArtifactStore[WhiskActivation], @@ -200,36 +188,30 @@ class ViewTests f: (WhiskEntity) => Boolean)(implicit entities: Seq[WhiskEntity]) = { implicit val tid = transid() val result = - Await.result(WhiskActivation.listActivationsMatchingName(store, ns, name, 0, 0, stale = StaleParameter.No) map { - _.left.get map { e => - e - } - }, dbOpTimeout) + Await.result( + WhiskActivation + .listActivationsMatchingName(store, ns, name.toPath, 0, 0, stale = StaleParameter.No) + .map(_.left.get), + dbOpTimeout) val expected = entities filter { e => f(e) && e.namespace.root.toPath == ns } - result.length should be(expected.length) - expected forall { e => - result contains e.summaryAsJson - } should be(true) + result should have length expected.length + result should contain theSameElementsAs expected.map(_.summaryAsJson) } def getKindInPackage(ns: EntityPath, kind: String, f: (WhiskEntity) => Boolean)( implicit entities: Seq[WhiskEntity]) = { implicit val tid = transid() val q = resolveListMethodForKind(kind) - val result = Await.result(q.listCollectionInNamespace(entityStore, ns, 0, 0, stale = StaleParameter.No) map { - _.left.get map { e => - e - } - }, dbOpTimeout) + val result = Await.result( + q.listCollectionInNamespace(entityStore, ns, 0, 0, stale = StaleParameter.No).map(_.left.get), + dbOpTimeout) val expected = entities filter { e => f(e) && e.namespace == ns } - result.length should be(expected.length) - expected forall { e => - result contains e.summaryAsJson - } should be(true) + result should have length expected.length + result should contain theSameElementsAs expected.map(_.summaryAsJson) } def getActivationsInNamespaceByNameSortedByDate(store: ArtifactStore[WhiskActivation], @@ -244,16 +226,13 @@ class ViewTests implicit val tid = transid() val result = Await.result( WhiskActivation - .listActivationsMatchingName(store, ns, name, skip, count, false, start, end, StaleParameter.No) map { - _.left.get map { e => - e - } - }, + .listActivationsMatchingName(store, ns, name.toPath, skip, count, false, start, end, StaleParameter.No) + .map(_.left.get), dbOpTimeout) val expected = entities filter { e => f(e) && e.namespace.root.toPath == ns } sortBy { case (e: WhiskActivation) => e.start.toEpochMilli; case _ => 0 } map { _.summaryAsJson } - result.length should be(expected.length) + result should have length expected.length result should be(expected reverse) } @@ -306,8 +285,8 @@ class ViewTests WhiskPackage(namespace2, aname(), Some(Binding(namespace1.root, aname())))) entities foreach { put(entityStore, _) } - waitOnView(entityStore, namespace1.root, 15, WhiskEntityQueries.ALL) - waitOnView(entityStore, namespace2.root, 14, WhiskEntityQueries.ALL) + waitOnView(entityStore, namespace1.root, 15, WhiskEntityQueries.viewAll) + waitOnView(entityStore, namespace2.root, 14, WhiskEntityQueries.viewAll) getAllInNamespace(entityStore, namespace1) getKindInNamespace(entityStore, namespace1, "actions", { @@ -375,8 +354,8 @@ class ViewTests WhiskActivation(namespace2, actionName, Subject(), ActivationId(), start = now, end = now)) entities foreach { put(activationStore, _) } - waitOnView(activationStore, namespace1.root, 2, WhiskActivation.collectionName) - waitOnView(activationStore, namespace2.root, 3, WhiskActivation.collectionName) + waitOnView(activationStore, namespace1.root, 2, WhiskActivation.view) + waitOnView(activationStore, namespace2.root, 3, WhiskActivation.view) getAllActivationsInNamespace(activationStore, namespace1) getKindInNamespace(activationStore, namespace1, "activations", { @@ -431,7 +410,7 @@ class ViewTests end = now.plusSeconds(20))) entities foreach { put(activationStore, _) } - waitOnView(activationStore, namespace1.root, entities.length, WhiskActivation.collectionName) + waitOnView(activationStore, namespace1.root, entities.length, WhiskActivation.view) getActivationsInNamespaceByNameSortedByDate( activationStore, @@ -472,7 +451,7 @@ class ViewTests Seq(WhiskAction(namespace1, aname(), jsDefault("??")), WhiskAction(namespace1, aname(), jsDefault("??"))) entities foreach { put(entityStore, _) } - waitOnView(entityStore, namespace1.root, entities.length, WhiskEntityQueries.ALL) + waitOnView(entityStore, namespace1.root, entities.length, WhiskEntityQueries.viewAll) getKindInNamespaceWithDoc[WhiskAction](namespace1, "actions", { case (e: WhiskAction) => true case (_) => false -- To stop receiving notification emails like this one, please contact ['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].