This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new cfd181b Use the stale parameter on listing activations. (#2632) cfd181b is described below commit cfd181b30d0c7af622d8f0af3768bf1602cdf355 Author: Christian Bickel <git...@cbickel.de> AuthorDate: Wed Aug 23 13:49:13 2017 +0200 Use the stale parameter on listing activations. (#2632) --- .../scala/whisk/core/database/ArtifactStore.scala | 10 ++++- .../whisk/core/database/CouchDbRestClient.scala | 2 + .../whisk/core/database/CouchDbRestStore.scala | 3 +- .../main/scala/whisk/core/entity/Identity.scala | 6 ++- .../main/scala/whisk/core/entity/WhiskStore.scala | 47 +++++++++++++--------- .../scala/whisk/core/controller/Activations.scala | 23 ++++++----- .../test/scala/system/basic/WskConsoleTests.scala | 2 +- .../scala/whisk/core/database/test/DbUtils.scala | 3 +- .../scala/whisk/core/entity/test/ViewTests.scala | 13 +++--- 9 files changed, 68 insertions(+), 41 deletions(-) diff --git a/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala b/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala index 51875d5..fc0638b 100644 --- a/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala +++ b/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala @@ -28,6 +28,14 @@ import whisk.common.Logging import whisk.common.TransactionId import whisk.core.entity.DocInfo +abstract class StaleParameter(val value: Option[String]) + +object StaleParameter { + case object Ok extends StaleParameter(Some("ok")) + case object UpdateAfter extends StaleParameter(Some("update_after")) + case object No extends StaleParameter(None) +} + /** Basic client to put and delete artifacts in a data store. */ trait ArtifactStore[DocumentAbstraction] { @@ -84,7 +92,7 @@ trait ArtifactStore[DocumentAbstraction] { * @param transid the transaction id for logging * @return a future that completes with List[JsObject] of all documents from view between start and end key (list may be empty) */ - protected[core] def query(table: String, startKey: List[Any], endKey: List[Any], skip: Int, limit: Int, includeDocs: Boolean, descending: Boolean, reduce: Boolean)( + protected[core] def query(table: String, startKey: List[Any], endKey: List[Any], skip: Int, limit: Int, includeDocs: Boolean, descending: Boolean, reduce: Boolean, stale: StaleParameter)( implicit transid: TransactionId): Future[List[JsObject]] /** diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala index ae168e8..dfca680 100644 --- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala +++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala @@ -174,6 +174,7 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str endKey: List[Any] = Nil, skip: Option[Int] = None, limit: Option[Int] = None, + stale: StaleParameter = StaleParameter.No, includeDocs: Boolean = false, descending: Boolean = false, reduce: Boolean = false, @@ -205,6 +206,7 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str "endkey" -> list2OptJson(endKey).map(_.toString), "skip" -> skip.filter(_ > 0).map(_.toString), "limit" -> limit.filter(_ > 0).map(_.toString), + "stale" -> stale.value, "include_docs" -> Some(includeDocs).filter(identity).map(_.toString), "descending" -> Some(descending).filter(identity).map(_.toString), "reduce" -> Some(reduce).map(_.toString), diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala index 99cecb0..34feccb 100644 --- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala +++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala @@ -174,7 +174,7 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer]( reportFailure(f, failure => transid.failed(this, start, s"[GET] '$dbName' internal error, doc: '$doc', failure: '${failure.getMessage}'", ErrorLevel)) } - override protected[core] def query(table: String, startKey: List[Any], endKey: List[Any], skip: Int, limit: Int, includeDocs: Boolean, descending: Boolean, reduce: Boolean)( + override protected[core] def query(table: String, startKey: List[Any], endKey: List[Any], skip: Int, limit: Int, includeDocs: Boolean, descending: Boolean, reduce: Boolean, stale: StaleParameter)( implicit transid: TransactionId): Future[List[JsObject]] = { require(!(reduce && includeDocs), "reduce and includeDocs cannot both be true") @@ -196,6 +196,7 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer]( endKey = realEndKey, skip = Some(skip), limit = Some(limit), + stale = stale, includeDocs = includeDocs, descending = descending, reduce = reduce) diff --git a/common/scala/src/main/scala/whisk/core/entity/Identity.scala b/common/scala/src/main/scala/whisk/core/entity/Identity.scala index 38bda86..d272f7a 100644 --- a/common/scala/src/main/scala/whisk/core/entity/Identity.scala +++ b/common/scala/src/main/scala/whisk/core/entity/Identity.scala @@ -18,6 +18,7 @@ package whisk.core.entity import scala.concurrent.Future +import scala.util.Try import spray.json._ import types.AuthStore @@ -25,9 +26,9 @@ import whisk.common.Logging import whisk.common.TransactionId import whisk.core.database.MultipleReadersSingleWriterCache import whisk.core.database.NoDocumentException +import whisk.core.database.StaleParameter import whisk.core.entitlement.Privilege import whisk.core.entitlement.Privilege.Privilege -import scala.util.Try case class UserLimits(invocationsPerMinute: Option[Int] = None, concurrentInvocations: Option[Int] = None, firesPerMinute: Option[Int] = None) @@ -104,7 +105,8 @@ object Identity extends MultipleReadersSingleWriterCache[Identity, DocInfo] with limit = limit, includeDocs = true, descending = true, - reduce = false) + reduce = false, + stale = StaleParameter.No) } private def rowToIdentity(row: JsObject, key: String)( diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala index c25ff2e..0edadb2 100644 --- a/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala +++ b/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala @@ -18,6 +18,11 @@ package whisk.core.entity import java.time.Instant + +import scala.concurrent.Future +import scala.language.postfixOps +import scala.util.Try + import akka.actor.ActorSystem import spray.json.JsObject import spray.json.JsString @@ -38,9 +43,7 @@ import whisk.core.database.ArtifactStore import whisk.core.database.ArtifactStoreProvider import whisk.core.database.DocumentRevisionProvider import whisk.core.database.DocumentSerializer -import scala.concurrent.Future -import scala.language.postfixOps -import scala.util.Try +import whisk.core.database.StaleParameter import whisk.spi.SpiLoader package object types { @@ -127,7 +130,6 @@ object WhiskActivationStore { SpiLoader.get[ArtifactStoreProvider]().makeStore[WhiskActivation](config, _.dbActivations) } - /** * This object provides some utilities that query the whisk datastore. * The datastore is assumed to have views (pre-computed joins or indexes) @@ -183,12 +185,13 @@ object WhiskEntityQueries { def listAllInNamespace[A <: WhiskEntity]( db: ArtifactStore[A], namespace: EntityPath, - includeDocs: Boolean)( + includeDocs: Boolean, + stale: StaleParameter = StaleParameter.No)( implicit transid: TransactionId): Future[Map[String, List[JsObject]]] = { implicit val ec = db.executionContext val startKey = List(namespace.toString) val endKey = List(namespace.toString, TOP) - db.query(viewname(ALL), startKey, endKey, 0, 0, includeDocs, descending = true, reduce = false) map { + db.query(viewname(ALL), startKey, endKey, 0, 0, includeDocs, descending = true, reduce = false, stale = stale) map { _ map { row => val value = row.fields("value").asJsObject @@ -205,12 +208,13 @@ object WhiskEntityQueries { def listEntitiesInNamespace[A <: WhiskEntity]( db: ArtifactStore[A], namespace: EntityPath, - includeDocs: Boolean)( + includeDocs: Boolean, + stale: StaleParameter = StaleParameter.No)( implicit transid: TransactionId): Future[Map[String, List[JsObject]]] = { implicit val ec = db.executionContext val startKey = List(namespace.toString) val endKey = List(namespace.toString, TOP) - db.query(viewname(ENTITIES), startKey, endKey, 0, 0, includeDocs, descending = true, reduce = false) map { + db.query(viewname(ENTITIES), startKey, endKey, 0, 0, includeDocs, descending = true, reduce = false, stale = stale) map { _ map { row => val value = row.fields("value").asJsObject @@ -228,11 +232,12 @@ object WhiskEntityQueries { reduce: Boolean, since: Option[Instant] = None, upto: Option[Instant] = None, + stale: StaleParameter = StaleParameter.No, convert: Option[JsObject => Try[T]])( implicit transid: TransactionId): Future[Either[List[JsObject], List[T]]] = { val startKey = List(since map { _.toEpochMilli } getOrElse 0) val endKey = List(upto map { _.toEpochMilli } getOrElse TOP, TOP) - query(db, viewname(collection, true), startKey, endKey, skip, limit, reduce, convert) + query(db, viewname(collection, true), startKey, endKey, skip, limit, reduce, stale, convert) } def listCollectionInNamespace[A <: WhiskEntity, T]( @@ -243,11 +248,12 @@ object WhiskEntityQueries { limit: Int, since: Option[Instant] = None, upto: Option[Instant] = None, + stale: StaleParameter = StaleParameter.No, convert: Option[JsObject => Try[T]])( implicit transid: TransactionId): Future[Either[List[JsObject], List[T]]] = { val startKey = List(namespace.toString, since map { _.toEpochMilli } getOrElse 0) val endKey = List(namespace.toString, upto map { _.toEpochMilli } getOrElse TOP, TOP) - query(db, viewname(collection), startKey, endKey, skip, limit, reduce = false, convert) + query(db, viewname(collection), startKey, endKey, skip, limit, reduce = false, stale, convert) } def listCollectionByName[A <: WhiskEntity, T]( @@ -259,11 +265,12 @@ object WhiskEntityQueries { limit: Int, since: Option[Instant] = None, upto: Option[Instant] = None, + stale: StaleParameter = StaleParameter.No, convert: Option[JsObject => Try[T]])( implicit transid: TransactionId): Future[Either[List[JsObject], List[T]]] = { val startKey = List(namespace.addPath(name).toString, since map { _.toEpochMilli } getOrElse 0) val endKey = List(namespace.addPath(name).toString, upto map { _.toEpochMilli } getOrElse TOP, TOP) - query(db, viewname(collection), startKey, endKey, skip, limit, reduce = false, convert) + query(db, viewname(collection), startKey, endKey, skip, limit, reduce = false, stale, convert) } private def query[A <: WhiskEntity, T]( @@ -274,11 +281,12 @@ object WhiskEntityQueries { skip: Int, limit: Int, reduce: Boolean, + stale: StaleParameter = StaleParameter.No, convert: Option[JsObject => Try[T]])( implicit transid: TransactionId): Future[Either[List[JsObject], List[T]]] = { implicit val ec = db.executionContext val includeDocs = convert.isDefined - db.query(view, startKey, endKey, skip, limit, includeDocs, true, reduce) map { + db.query(view, startKey, endKey, skip, limit, includeDocs, true, reduce, stale) map { rows => convert map { fn => Right(rows flatMap { row => fn(row.fields("doc").asJsObject) toOption }) @@ -310,10 +318,11 @@ trait WhiskEntityQueries[T] { docs: Boolean = false, reduce: Boolean = false, since: Option[Instant] = None, - upto: Option[Instant] = None)( + upto: Option[Instant] = None, + stale: StaleParameter = StaleParameter.No)( implicit transid: TransactionId) = { val convert = if (docs) Some((o: JsObject) => Try { serdes.read(o) }) else None - WhiskEntityQueries.listCollectionInAnyNamespace(db, collectionName, skip, limit, reduce, since, upto, convert) + WhiskEntityQueries.listCollectionInAnyNamespace(db, collectionName, skip, limit, reduce, since, upto, stale, convert) } def listCollectionInNamespace[A <: WhiskEntity, T]( @@ -323,10 +332,11 @@ trait WhiskEntityQueries[T] { limit: Int, docs: Boolean = false, since: Option[Instant] = None, - upto: Option[Instant] = None)( + upto: Option[Instant] = None, + stale: StaleParameter = StaleParameter.No)( implicit transid: TransactionId) = { val convert = if (docs) Some((o: JsObject) => Try { serdes.read(o) }) else None - WhiskEntityQueries.listCollectionInNamespace(db, collectionName, namespace, skip, limit, since, upto, convert) + WhiskEntityQueries.listCollectionInNamespace(db, collectionName, namespace, skip, limit, since, upto, stale, convert) } def listCollectionByName[A <: WhiskEntity, T]( @@ -337,9 +347,10 @@ trait WhiskEntityQueries[T] { limit: Int, docs: Boolean = false, since: Option[Instant] = None, - upto: Option[Instant] = None)( + upto: Option[Instant] = None, + stale: StaleParameter = StaleParameter.No)( implicit transid: TransactionId) = { val convert = if (docs) Some((o: JsObject) => Try { serdes.read(o) }) else None - WhiskEntityQueries.listCollectionByName(db, collectionName, namespace, name, skip, limit, since, upto, convert) + WhiskEntityQueries.listCollectionByName(db, collectionName, namespace, name, skip, limit, since, upto, stale, convert) } } diff --git a/core/controller/src/main/scala/whisk/core/controller/Activations.scala b/core/controller/src/main/scala/whisk/core/controller/Activations.scala index e643b13..f45accf 100644 --- a/core/controller/src/main/scala/whisk/core/controller/Activations.scala +++ b/core/controller/src/main/scala/whisk/core/controller/Activations.scala @@ -25,23 +25,22 @@ import scala.util.Success import scala.util.Try import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.sprayJsonMarshaller +import akka.http.scaladsl.model.StatusCodes.BadRequest import akka.http.scaladsl.server.Directives import akka.http.scaladsl.unmarshalling._ -import akka.http.scaladsl.model.StatusCodes.BadRequest - -import spray.json.DeserializationException -import spray.json.DefaultJsonProtocol.RootJsObjectFormat import spray.json._ - +import spray.json.DefaultJsonProtocol.RootJsObjectFormat +import spray.json.DeserializationException import whisk.common.TransactionId +import whisk.core.database.StaleParameter import whisk.core.entitlement.Collection import whisk.core.entitlement.Privilege.Privilege import whisk.core.entitlement.Privilege.READ import whisk.core.entitlement.Resource import whisk.core.entity._ import whisk.core.entity.types.ActivationStore -import whisk.http.Messages import whisk.http.ErrorResponse.terminate +import whisk.http.Messages object WhiskActivationsApi { protected[core] val maxActivationLimit = 200 @@ -122,16 +121,18 @@ trait WhiskActivationsApi if (cappedLimit <= WhiskActivationsApi.maxActivationLimit) { val activations = name match { case Some(action) => - WhiskActivation.listCollectionByName(activationStore, namespace, action, skip, cappedLimit, docs, since, upto) + WhiskActivation.listCollectionByName(activationStore, namespace, action, skip, cappedLimit, docs, since, upto, StaleParameter.UpdateAfter) case None => - WhiskActivation.listCollectionInNamespace(activationStore, namespace, skip, cappedLimit, docs, since, upto) + WhiskActivation.listCollectionInNamespace(activationStore, namespace, skip, cappedLimit, docs, since, upto, StaleParameter.UpdateAfter) } listEntities { activations map { - l => if (docs) l.right.get map { - _.toExtendedJson - } else l.left.get + l => + if (docs) l.right.get map { + _.toExtendedJson + } + else l.left.get } } } else { diff --git a/tests/src/test/scala/system/basic/WskConsoleTests.scala b/tests/src/test/scala/system/basic/WskConsoleTests.scala index aaa30b6..56d5ad0 100644 --- a/tests/src/test/scala/system/basic/WskConsoleTests.scala +++ b/tests/src/test/scala/system/basic/WskConsoleTests.scala @@ -72,7 +72,7 @@ class WskConsoleTests val console = wsk.activation.console(10 seconds, since = duration) println(console.stdout) console.stdout should include(payload) - } + } } it should "show repeated activations" in withAssetCleaner(wskprops) { diff --git a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala index 355c2ec..da1fed0 100644 --- a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala +++ b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala @@ -38,6 +38,7 @@ import whisk.core.database.ArtifactStore import whisk.core.database.CouchDbRestClient import whisk.core.database.DocumentFactory import whisk.core.database.NoDocumentException +import whisk.core.database.StaleParameter import whisk.core.entity.AuthKey import whisk.core.entity.DocId import whisk.core.entity.DocInfo @@ -99,7 +100,7 @@ trait DbUtils extends TransactionCounter { val success = retry(() => { val startKey = List(namespace.toString) val endKey = List(namespace.toString, WhiskEntityQueries.TOP) - db.query(WhiskEntityQueries.viewname(WhiskEntityQueries.ALL), startKey, endKey, 0, 0, false, true, false) map { l => + db.query(WhiskEntityQueries.viewname(WhiskEntityQueries.ALL), startKey, endKey, 0, 0, false, true, false, StaleParameter.No) map { l => if (l.length != count) { throw RetryOp() } else true diff --git a/tests/src/test/scala/whisk/core/entity/test/ViewTests.scala b/tests/src/test/scala/whisk/core/entity/test/ViewTests.scala index 7602b9d..4000ac2 100644 --- a/tests/src/test/scala/whisk/core/entity/test/ViewTests.scala +++ b/tests/src/test/scala/whisk/core/entity/test/ViewTests.scala @@ -37,6 +37,7 @@ import spray.json.JsObject import whisk.core.WhiskConfig import whisk.core.controller.test.WhiskAuthHelpers import whisk.core.database.ArtifactStore +import whisk.core.database.StaleParameter import whisk.core.database.test.DbUtils import whisk.core.entity._ import whisk.core.entity.WhiskEntityQueries._ @@ -87,7 +88,7 @@ class ViewTests extends FlatSpec def getAllInNamespace[Au <: WhiskEntity](store: ArtifactStore[Au], ns: EntityPath)(implicit entities: Seq[WhiskEntity]) = { implicit val tid = transid() - val result = Await.result(listAllInNamespace(store, ns, false), dbOpTimeout).values.toList.flatten + val result = Await.result(listAllInNamespace(store, ns, false, StaleParameter.No), dbOpTimeout).values.toList.flatten val expected = entities filter { _.namespace.root.toPath == ns } result.length should be(expected.length) result should contain theSameElementsAs expected.map(_.summaryAsJson) @@ -105,7 +106,7 @@ class ViewTests extends FlatSpec def getKindInNamespace[Au <: WhiskEntity](store: ArtifactStore[Au], ns: EntityPath, kind: String, f: (WhiskEntity) => Boolean)(implicit entities: Seq[WhiskEntity]) = { implicit val tid = transid() - val result = Await.result(listCollectionInNamespace(store, kind, ns, 0, 0, convert = None) map { _.left.get map { e => e } }, dbOpTimeout) + val result = Await.result(listCollectionInNamespace(store, kind, ns, 0, 0, stale = StaleParameter.No, convert = None) map { _.left.get map { e => e } }, dbOpTimeout) val expected = entities filter { e => f(e) && e.namespace.root.toPath == ns } result.length should be(expected.length) expected forall { e => result contains e.summaryAsJson } should be(true) @@ -121,7 +122,7 @@ class ViewTests extends FlatSpec def getKindInNamespaceWithDoc[T](ns: EntityPath, kind: String, f: (WhiskEntity) => Boolean, convert: Option[JsObject => Try[T]])(implicit entities: Seq[WhiskEntity]) = { implicit val tid = transid() - val result = Await.result(listCollectionInNamespace(entityStore, kind, ns, 0, 0, convert = convert) map { _.right.get }, dbOpTimeout) + val result = Await.result(listCollectionInNamespace(entityStore, kind, ns, 0, 0, stale = StaleParameter.No, convert = convert) map { _.right.get }, dbOpTimeout) val expected = entities filter { e => f(e) && e.namespace.root.toPath == ns } result.length should be(expected.length) expected forall { e => result contains e } should be(true) @@ -129,7 +130,7 @@ class ViewTests extends FlatSpec def getKindInNamespaceByName[Au <: WhiskEntity](store: ArtifactStore[Au], ns: EntityPath, kind: String, name: EntityName, f: (WhiskEntity) => Boolean)(implicit entities: Seq[WhiskEntity]) = { implicit val tid = transid() - val result = Await.result(listCollectionByName(store, kind, ns, name, 0, 0, convert = None) map { _.left.get map { e => e } }, dbOpTimeout) + val result = Await.result(listCollectionByName(store, kind, ns, name, 0, 0, stale = StaleParameter.No, convert = None) map { _.left.get map { e => e } }, dbOpTimeout) val expected = entities filter { e => f(e) && e.namespace.root.toPath == ns } result.length should be(expected.length) expected forall { e => result contains e.summaryAsJson } should be(true) @@ -137,7 +138,7 @@ class ViewTests extends FlatSpec def getKindInPackage(ns: EntityPath, kind: String, f: (WhiskEntity) => Boolean)(implicit entities: Seq[WhiskEntity]) = { implicit val tid = transid() - val result = Await.result(listCollectionInNamespace(entityStore, kind, ns, 0, 0, convert = None) map { _.left.get map { e => e } }, dbOpTimeout) + val result = Await.result(listCollectionInNamespace(entityStore, kind, ns, 0, 0, stale = StaleParameter.No, convert = None) map { _.left.get map { e => e } }, dbOpTimeout) val expected = entities filter { e => f(e) && e.namespace == ns } result.length should be(expected.length) expected forall { e => result contains e.summaryAsJson } should be(true) @@ -147,7 +148,7 @@ class ViewTests extends FlatSpec store: ArtifactStore[Au], ns: EntityPath, kind: String, name: EntityName, skip: Int, count: Int, start: Option[Instant], end: Option[Instant], f: (WhiskEntity) => Boolean)( implicit entities: Seq[WhiskEntity]) = { implicit val tid = transid() - val result = Await.result(listCollectionByName(store, kind, ns, name, skip, count, start, end, convert = None) map { _.left.get map { e => e } }, dbOpTimeout) + val result = Await.result(listCollectionByName(store, kind, ns, name, skip, count, start, end, StaleParameter.No, convert = None) map { _.left.get map { e => e } }, dbOpTimeout) val expected = entities filter { e => f(e) && e.namespace.root.toPath == ns } sortBy { case (e: WhiskActivation) => e.start.toEpochMilli; case _ => 0 } map { _.summaryAsJson } result.length should be(expected.length) result should be(expected reverse) -- To stop receiving notification emails like this one, please contact ['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].