This is an automated email from the ASF dual-hosted git repository. rabbah pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 9be13e2 Reduce memory consumption for action invocation (#2730) 9be13e2 is described below commit 9be13e2c6d581f42b60351f05a11e4aa71ca31d9 Author: James Dubee <jwdu...@us.ibm.com> AuthorDate: Thu Nov 9 09:55:19 2017 -0500 Reduce memory consumption for action invocation (#2730) --- .../core/database/ArtifactStoreProvider.scala | 2 + .../whisk/core/database/CouchDbRestStore.scala | 12 +- .../whisk/core/database/CouchDbStoreProvider.scala | 2 + .../core/database/RemoteCacheInvalidation.scala | 5 + .../DocumentReader.scala} | 22 +-- .../src/main/scala/whisk/core/entity/Exec.scala | 155 ++++++++++++++++++++- .../main/scala/whisk/core/entity/WhiskAction.scala | 147 +++++++++++++++++++ .../main/scala/whisk/core/entity/WhiskEntity.scala | 14 ++ .../main/scala/whisk/core/entity/WhiskStore.scala | 11 +- .../src/main/scala/whisk/http/ErrorResponse.scala | 7 + .../main/scala/whisk/core/controller/Actions.scala | 16 ++- .../scala/whisk/core/controller/Controller.scala | 5 +- .../scala/whisk/core/controller/WebActions.scala | 14 +- .../controller/actions/PostActionActivation.scala | 4 +- .../core/controller/actions/PrimitiveActions.scala | 2 +- .../core/controller/actions/SequenceActions.scala | 16 +-- .../scala/whisk/core/entitlement/Entitlement.scala | 4 + .../core/loadBalancer/LoadBalancerService.scala | 12 +- .../core/controller/test/ActionsApiTests.scala | 13 ++ .../core/controller/test/ActivationsApiTests.scala | 7 +- .../controller/test/ControllerTestCommon.scala | 2 +- .../core/controller/test/WebActionsApiTests.scala | 8 +- .../scala/whisk/core/entity/test/ExecHelpers.scala | 9 ++ 23 files changed, 432 insertions(+), 57 deletions(-) diff --git a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala b/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala index 2080d47..e8cac18 100644 --- a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala +++ b/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala @@ -23,6 +23,7 @@ import spray.json.RootJsonFormat import whisk.common.Logging import whisk.core.WhiskConfig import whisk.spi.Spi +import whisk.core.entity.DocumentReader /** * An Spi for providing ArtifactStore implementations @@ -32,6 +33,7 @@ trait ArtifactStoreProvider extends Spi { name: WhiskConfig => String, useBatching: Boolean = false)( implicit jsonFormat: RootJsonFormat[D], + docReader: DocumentReader, actorSystem: ActorSystem, logging: Logging, materializer: ActorMaterializer): ArtifactStore[D] diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala index 0e66aee..298cf09 100644 --- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala +++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala @@ -36,6 +36,7 @@ import whisk.core.entity.DocInfo import whisk.core.entity.DocRevision import whisk.core.entity.WhiskDocument import whisk.http.Messages +import whisk.core.entity.DocumentReader /** * Basic client to put and delete artifacts in a data store. @@ -58,7 +59,8 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St implicit system: ActorSystem, val logging: Logging, jsonFormat: RootJsonFormat[DocumentAbstraction], - materializer: ActorMaterializer) + materializer: ActorMaterializer, + docReader: DocumentReader) extends ArtifactStore[DocumentAbstraction] with DefaultJsonProtocol { @@ -223,7 +225,13 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St e match { case Right(response) => transid.finished(this, start, s"[GET] '$dbName' completed: found document '$doc'") - val asFormat = jsonFormat.read(response) + + val asFormat = try { + docReader.read(ma, response) + } catch { + case e: Exception => jsonFormat.read(response) + } + if (asFormat.getClass != ma.runtimeClass) { throw DocumentTypeMismatchException( s"document type ${asFormat.getClass} did not match expected type ${ma.runtimeClass}.") diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala index d2c08dd..08b915e 100644 --- a/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala +++ b/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala @@ -22,11 +22,13 @@ import akka.stream.ActorMaterializer import spray.json.RootJsonFormat import whisk.common.Logging import whisk.core.WhiskConfig +import whisk.core.entity.DocumentReader object CouchDbStoreProvider extends ArtifactStoreProvider { def makeStore[D <: DocumentSerializer](config: WhiskConfig, name: WhiskConfig => String, useBatching: Boolean)( implicit jsonFormat: RootJsonFormat[D], + docReader: DocumentReader, actorSystem: ActorSystem, logging: Logging, materializer: ActorMaterializer): ArtifactStore[D] = { diff --git a/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala b/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala index 426b602..b615708 100644 --- a/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala +++ b/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala @@ -36,6 +36,7 @@ import whisk.core.connector.MessagingProvider import whisk.core.entity.CacheKey import whisk.core.entity.InstanceId import whisk.core.entity.WhiskAction +import whisk.core.entity.WhiskActionMetaData import whisk.core.entity.WhiskPackage import whisk.core.entity.WhiskRule import whisk.core.entity.WhiskTrigger @@ -76,12 +77,16 @@ class RemoteCacheInvalidation(config: WhiskConfig, component: String, instance: removeFromLocalCache) }) + def invalidateWhiskActionMetaData(key: CacheKey) = + WhiskActionMetaData.removeId(key) + private def removeFromLocalCache(bytes: Array[Byte]): Future[Unit] = Future { val raw = new String(bytes, StandardCharsets.UTF_8) CacheInvalidationMessage.parse(raw) match { case Success(msg: CacheInvalidationMessage) => { if (msg.instanceId != instanceId) { + WhiskActionMetaData.removeId(msg.key) WhiskAction.removeId(msg.key) WhiskPackage.removeId(msg.key) WhiskRule.removeId(msg.key) diff --git a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala b/common/scala/src/main/scala/whisk/core/entity/DocumentReader.scala similarity index 54% copy from common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala copy to common/scala/src/main/scala/whisk/core/entity/DocumentReader.scala index 2080d47..ec53cf7 100644 --- a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala +++ b/common/scala/src/main/scala/whisk/core/entity/DocumentReader.scala @@ -15,24 +15,10 @@ * limitations under the License. */ -package whisk.core.database +package whisk.core.entity -import akka.actor.ActorSystem -import akka.stream.ActorMaterializer -import spray.json.RootJsonFormat -import whisk.common.Logging -import whisk.core.WhiskConfig -import whisk.spi.Spi +import spray.json._ -/** - * An Spi for providing ArtifactStore implementations - */ -trait ArtifactStoreProvider extends Spi { - def makeStore[D <: DocumentSerializer](config: WhiskConfig, - name: WhiskConfig => String, - useBatching: Boolean = false)( - implicit jsonFormat: RootJsonFormat[D], - actorSystem: ActorSystem, - logging: Logging, - materializer: ActorMaterializer): ArtifactStore[D] +protected[core] abstract class DocumentReader { + def read[A](ma: Manifest[A], value: JsValue): WhiskDocument } diff --git a/common/scala/src/main/scala/whisk/core/entity/Exec.scala b/common/scala/src/main/scala/whisk/core/entity/Exec.scala index 49ede1d..3ea1ec5 100644 --- a/common/scala/src/main/scala/whisk/core/entity/Exec.scala +++ b/common/scala/src/main/scala/whisk/core/entity/Exec.scala @@ -45,7 +45,7 @@ import whisk.core.entity.size.SizeString * main : name of the entry point function, when using a non-default value (for Java, the name of the main class)" } */ sealed abstract class Exec extends ByteSizeable { - override def toString = Exec.serdes.write(this).compactPrint + override def toString: String = Exec.serdes.write(this).compactPrint /** A type descriptor. */ val kind: String @@ -54,6 +54,10 @@ sealed abstract class Exec extends ByteSizeable { val deprecated: Boolean } +sealed abstract class ExecMetaDataBase extends Exec { + override def toString: String = ExecMetaDataBase.serdes.write(this).compactPrint +} + /** * A common super class for all action exec types that contain their executable * code explicitly (i.e., any action other than a sequence). @@ -89,6 +93,14 @@ sealed abstract class CodeExec[+T <% SizeConversion] extends Exec { override def size = code.sizeInBytes + entryPoint.map(_.sizeInBytes).getOrElse(0.B) } +sealed abstract class ExecMetaData extends ExecMetaDataBase { + + /** Indicates if a container image is required from the registry to execute the action. */ + val pull: Boolean + + override def size = 0.B +} + protected[core] case class CodeExecAsString(manifest: RuntimeManifest, override val code: String, override val entryPoint: Option[String]) @@ -102,6 +114,12 @@ protected[core] case class CodeExecAsString(manifest: RuntimeManifest, override def codeAsJson = JsString(code) } +protected[core] case class CodeExecMetaDataAsString(manifest: RuntimeManifest) extends ExecMetaData { + override val kind = manifest.kind + override val deprecated = manifest.deprecated.getOrElse(false) + override val pull = false +} + protected[core] case class CodeExecAsAttachment(manifest: RuntimeManifest, override val code: Attachment[String], override val entryPoint: Option[String]) @@ -126,6 +144,12 @@ protected[core] case class CodeExecAsAttachment(manifest: RuntimeManifest, } } +protected[core] case class CodeExecMetaDataAsAttachment(manifest: RuntimeManifest) extends ExecMetaData { + override val kind = manifest.kind + override val deprecated = manifest.deprecated.getOrElse(false) + override val pull = false +} + /** * @param image the image name * @param code an optional script or zip archive (as base64 encoded) string @@ -144,12 +168,24 @@ protected[core] case class BlackBoxExec(override val image: ImageName, override def size = super.size + image.publicImageName.sizeInBytes } +protected[core] case class BlackBoxExecMetaData(val native: Boolean) extends ExecMetaData { + override val kind = ExecMetaDataBase.BLACKBOX + override val deprecated = false + override val pull = !native +} + protected[core] case class SequenceExec(components: Vector[FullyQualifiedEntityName]) extends Exec { override val kind = Exec.SEQUENCE override val deprecated = false override def size = components.map(_.size).reduceOption(_ + _).getOrElse(0.B) } +protected[core] case class SequenceExecMetaData(components: Vector[FullyQualifiedEntityName]) extends ExecMetaDataBase { + override val kind = ExecMetaDataBase.SEQUENCE + override val deprecated = false + override def size = components.map(_.size).reduceOption(_ + _).getOrElse(0.B) +} + protected[core] object Exec extends ArgNormalizer[Exec] with DefaultJsonProtocol { val sizeLimit = 48 MB @@ -187,6 +223,7 @@ protected[core] object Exec extends ArgNormalizer[Exec] with DefaultJsonProtocol val code = b.code.filter(_.trim.nonEmpty).map("code" -> JsString(_)) val main = b.entryPoint.map("main" -> JsString(_)) JsObject(base ++ code ++ main) + case _ => JsObject() } override def read(v: JsValue) = { @@ -278,3 +315,119 @@ protected[core] object Exec extends ArgNormalizer[Exec] with DefaultJsonProtocol } else false } } + +protected[core] object ExecMetaDataBase extends ArgNormalizer[ExecMetaDataBase] with DefaultJsonProtocol { + + val sizeLimit = 48 MB + + // The possible values of the JSON 'kind' field for certain runtimes: + // - Sequence because it is an intrinsic + // - Black Box because it is a type marker + protected[core] val SEQUENCE = "sequence" + protected[core] val BLACKBOX = "blackbox" + + private def execManifests = ExecManifest.runtimesManifest + + override protected[core] implicit lazy val serdes = new RootJsonFormat[ExecMetaDataBase] { + private def attFmt[T: JsonFormat] = Attachments.serdes[T] + private lazy val runtimes: Set[String] = execManifests.knownContainerRuntimes ++ Set(SEQUENCE, BLACKBOX) + + override def write(e: ExecMetaDataBase) = e match { + case c: CodeExecMetaDataAsString => + val base = Map("kind" -> JsString(c.kind)) + JsObject(base) + + case a: CodeExecMetaDataAsAttachment => + val base = + Map("kind" -> JsString(a.kind)) + JsObject(base) + + case s @ SequenceExecMetaData(comp) => + JsObject("kind" -> JsString(s.kind), "components" -> comp.map(_.qualifiedNameWithLeadingSlash).toJson) + + case b: BlackBoxExecMetaData => + val base = + Map("kind" -> JsString(b.kind)) + JsObject(base) + } + + override def read(v: JsValue) = { + require(v != null) + + val obj = v.asJsObject + + val kind = obj.fields.get("kind") match { + case Some(JsString(k)) => k.trim.toLowerCase + case _ => throw new DeserializationException("'kind' must be a string defined in 'exec'") + } + + lazy val optMainField: Option[String] = obj.fields.get("main") match { + case Some(JsString(m)) => Some(m) + case Some(_) => + throw new DeserializationException(s"if defined, 'main' be a string in 'exec' for '$kind' actions") + case None => None + } + + kind match { + case ExecMetaDataBase.SEQUENCE => + val comp: Vector[FullyQualifiedEntityName] = obj.fields.get("components") match { + case Some(JsArray(components)) => components map (FullyQualifiedEntityName.serdes.read(_)) + case Some(_) => throw new DeserializationException(s"'components' must be an array") + case None => throw new DeserializationException(s"'components' must be defined for sequence kind") + } + SequenceExecMetaData(comp) + + case ExecMetaDataBase.BLACKBOX => + val image: ImageName = obj.fields.get("image") match { + case Some(JsString(i)) => ImageName.fromString(i).get // throws deserialization exception on failure + case _ => + throw new DeserializationException( + s"'image' must be a string defined in 'exec' for '${Exec.BLACKBOX}' actions") + } + val code: Option[String] = obj.fields.get("code") match { + case Some(JsString(i)) => if (i.trim.nonEmpty) Some(i) else None + case Some(_) => + throw new DeserializationException( + s"if defined, 'code' must a string defined in 'exec' for '${Exec.BLACKBOX}' actions") + case None => None + } + val native = execManifests.blackboxImages.contains(image) + BlackBoxExecMetaData(native) + + case _ => + // map "default" virtual runtime versions to the currently blessed actual runtime version + val manifest = execManifests.resolveDefaultRuntime(kind) match { + case Some(k) => k + case None => throw new DeserializationException(s"kind '$kind' not in $runtimes") + } + + manifest.attached + .map { a => + val jar: Attachment[String] = { + // java actions once stored the attachment in "jar" instead of "code" + obj.fields.get("code").orElse(obj.fields.get("jar")) + } map { + attFmt[String].read(_) + } getOrElse { + throw new DeserializationException( + s"'code' must be a valid base64 string in 'exec' for '$kind' actions") + } + val main = optMainField.orElse { + if (manifest.requireMain.exists(identity)) { + throw new DeserializationException(s"'main' must be a string defined in 'exec' for '$kind' actions") + } else None + } + CodeExecMetaDataAsAttachment(manifest) + } + .getOrElse { + val code: String = obj.fields.get("code") match { + case Some(JsString(c)) => c + case _ => + throw new DeserializationException(s"'code' must be a string defined in 'exec' for '$kind' actions") + } + CodeExecMetaDataAsString(manifest) + } + } + } + } +} diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala index c90367e..41af1aa 100644 --- a/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala +++ b/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala @@ -100,6 +100,10 @@ abstract class WhiskActionLike(override val name: EntityName) extends WhiskEntit "annotations" -> annotations.toJson) } +abstract class WhiskActionLikeMetaData(override val name: EntityName) extends WhiskActionLike(name) { + override def exec: ExecMetaDataBase +} + /** * A WhiskAction provides an abstraction of the meta-data * for a whisk action. @@ -161,6 +165,51 @@ case class WhiskAction(namespace: EntityPath, } } +@throws[IllegalArgumentException] +case class WhiskActionMetaData(namespace: EntityPath, + override val name: EntityName, + exec: ExecMetaDataBase, + parameters: Parameters = Parameters(), + limits: ActionLimits = ActionLimits(), + version: SemVer = SemVer(), + publish: Boolean = false, + annotations: Parameters = Parameters()) + extends WhiskActionLikeMetaData(name) { + + require(exec != null, "exec undefined") + require(limits != null, "limits undefined") + + /** + * Merges parameters (usually from package) with existing action parameters. + * Existing parameters supersede those in p. + */ + def inherit(p: Parameters) = copy(parameters = p ++ parameters).revision[WhiskActionMetaData](rev) + + /** + * Resolves sequence components if they contain default namespace. + */ + protected[core] def resolve(userNamespace: EntityName): WhiskActionMetaData = { + exec match { + case SequenceExecMetaData(components) => + val newExec = SequenceExecMetaData(components map { c => + FullyQualifiedEntityName(c.path.resolveNamespace(userNamespace), c.name) + }) + copy(exec = newExec).revision[WhiskActionMetaData](rev) + case _ => this + } + } + + def toExecutableWhiskAction = exec match { + case execMetaData: ExecMetaData => + Some( + ExecutableWhiskActionMetaData(namespace, name, execMetaData, parameters, limits, version, publish, annotations) + .revision[ExecutableWhiskActionMetaData](rev)) + case _ => + None + } + +} + /** * Variant of WhiskAction which only includes information necessary to be * executed by an Invoker. @@ -206,6 +255,25 @@ case class ExecutableWhiskAction(namespace: EntityPath, WhiskAction(namespace, name, exec, parameters, limits, version, publish, annotations).revision[WhiskAction](rev) } +@throws[IllegalArgumentException] +case class ExecutableWhiskActionMetaData(namespace: EntityPath, + override val name: EntityName, + exec: ExecMetaData, + parameters: Parameters = Parameters(), + limits: ActionLimits = ActionLimits(), + version: SemVer = SemVer(), + publish: Boolean = false, + annotations: Parameters = Parameters()) + extends WhiskActionLikeMetaData(name) { + + require(exec != null, "exec undefined") + require(limits != null, "limits undefined") + + def toWhiskAction = + WhiskActionMetaData(namespace, name, exec, parameters, limits, version, publish, annotations) + .revision[WhiskActionMetaData](rev) +} + object WhiskAction extends DocumentFactory[WhiskAction] with WhiskEntityQueries[WhiskAction] with DefaultJsonProtocol { val execFieldName = "exec" @@ -342,6 +410,85 @@ object WhiskAction extends DocumentFactory[WhiskAction] with WhiskEntityQueries[ } } +object WhiskActionMetaData + extends DocumentFactory[WhiskActionMetaData] + with WhiskEntityQueries[WhiskActionMetaData] + with DefaultJsonProtocol { + + val execFieldName = "exec" + val finalParamsAnnotationName = "final" + + override val collectionName = "actions" + + override implicit val serdes = jsonFormat( + WhiskActionMetaData.apply, + "namespace", + "name", + "exec", + "parameters", + "limits", + "version", + "publish", + "annotations") + + override val cacheEnabled = true + + /** + * Resolves an action name if it is contained in a package. + * Look up the package to determine if it is a binding or the actual package. + * If it's a binding, rewrite the fully qualified name of the action using the actual package path name. + * If it's the actual package, use its name directly as the package path name. + */ + def resolveAction(db: EntityStore, fullyQualifiedActionName: FullyQualifiedEntityName)( + implicit ec: ExecutionContext, + transid: TransactionId): Future[FullyQualifiedEntityName] = { + // first check that there is a package to be resolved + val entityPath = fullyQualifiedActionName.path + if (entityPath.defaultPackage) { + // this is the default package, nothing to resolve + Future.successful(fullyQualifiedActionName) + } else { + // there is a package to be resolved + val pkgDocId = fullyQualifiedActionName.path.toDocId + val actionName = fullyQualifiedActionName.name + WhiskPackage.resolveBinding(db, pkgDocId) map { + _.fullyQualifiedName(withVersion = false).add(actionName) + } + } + } + + /** + * Resolves an action name if it is contained in a package. + * Look up the package to determine if it is a binding or the actual package. + * If it's a binding, rewrite the fully qualified name of the action using the actual package path name. + * If it's the actual package, use its name directly as the package path name. + * While traversing the package bindings, merge the parameters. + */ + def resolveActionAndMergeParameters(entityStore: EntityStore, fullyQualifiedName: FullyQualifiedEntityName)( + implicit ec: ExecutionContext, + transid: TransactionId): Future[WhiskActionMetaData] = { + // first check that there is a package to be resolved + val entityPath = fullyQualifiedName.path + if (entityPath.defaultPackage) { + // this is the default package, nothing to resolve + WhiskActionMetaData.get(entityStore, fullyQualifiedName.toDocId) + } else { + // there is a package to be resolved + val pkgDocid = fullyQualifiedName.path.toDocId + val actionName = fullyQualifiedName.name + val wp = WhiskPackage.resolveBinding(entityStore, pkgDocid, mergeParameters = true) + wp flatMap { resolvedPkg => + // fully resolved name for the action + val fqnAction = resolvedPkg.fullyQualifiedName(withVersion = false).add(actionName) + // get the whisk action associate with it and inherit the parameters from the package/binding + WhiskActionMetaData.get(entityStore, fqnAction.toDocId) map { + _.inherit(resolvedPkg.parameters) + } + } + } + } +} + object ActionLimitsOption extends DefaultJsonProtocol { implicit val serdes = jsonFormat3(ActionLimitsOption.apply) } diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskEntity.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskEntity.scala index 166b4c8..eafb7d5 100644 --- a/common/scala/src/main/scala/whisk/core/entity/WhiskEntity.scala +++ b/common/scala/src/main/scala/whisk/core/entity/WhiskEntity.scala @@ -106,6 +106,20 @@ object WhiskEntity { } } +object WhiskDocumentReader extends DocumentReader { + override def read[A](ma: Manifest[A], value: JsValue) = { + ma.runtimeClass match { + case x if x == classOf[WhiskAction] => WhiskAction.serdes.read(value) + case x if x == classOf[WhiskActionMetaData] => WhiskActionMetaData.serdes.read(value) + case x if x == classOf[WhiskPackage] => WhiskPackage.serdes.read(value) + case x if x == classOf[WhiskActivation] => WhiskActivation.serdes.read(value) + case x if x == classOf[WhiskTrigger] => WhiskTrigger.serdes.read(value) + case x if x == classOf[WhiskRule] => WhiskRule.serdes.read(value) + case _ => throw DocumentUnreadable(Messages.corruptedEntity) + } + } +} + /** * Dispatches to appropriate serdes. This object is not itself implicit so as to * avoid multiple implicit alternatives when working with one of the subtypes. diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala index 28addc2..b114450 100644 --- a/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala +++ b/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala @@ -89,6 +89,8 @@ protected[core] trait WhiskDocument extends DocumentSerializer with DocumentRevi } object WhiskAuthStore { + implicit val docReader = WhiskDocumentReader + def requiredProperties = Map( dbProvider -> null, @@ -118,11 +120,16 @@ object WhiskEntityStore { def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging, materializer: ActorMaterializer) = SpiLoader .get[ArtifactStoreProvider] - .makeStore[WhiskEntity](config, _.dbWhisk)(WhiskEntityJsonFormat, system, logging, materializer) - + .makeStore[WhiskEntity](config, _.dbWhisk)( + WhiskEntityJsonFormat, + WhiskDocumentReader, + system, + logging, + materializer) } object WhiskActivationStore { + implicit val docReader = WhiskDocumentReader def requiredProperties = Map( dbProvider -> null, diff --git a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala index 719a6c5..5130578 100644 --- a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala +++ b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala @@ -35,6 +35,7 @@ import whisk.common.TransactionId import whisk.core.entity.SizeError import whisk.core.entity.ByteSize import whisk.core.entity.Exec +import whisk.core.entity.ExecMetaDataBase import whisk.core.entity.ActivationId object Messages { @@ -55,6 +56,12 @@ object Messages { def runtimeDeprecated(e: Exec) = s"The '${e.kind}' runtime is no longer supported. You may read and delete but not update or invoke this action." + /** + * Standard message for reporting deprecated runtimes. + */ + def runtimeDeprecated(e: ExecMetaDataBase) = + s"The '${e.kind}' runtime is no longer supported. You may read and delete but not update or invoke this action." + /** Standard message for resource not found. */ val resourceDoesNotExist = "The requested resource does not exist." diff --git a/core/controller/src/main/scala/whisk/core/controller/Actions.scala b/core/controller/src/main/scala/whisk/core/controller/Actions.scala index fe1ebe6..fcdea99 100644 --- a/core/controller/src/main/scala/whisk/core/controller/Actions.scala +++ b/core/controller/src/main/scala/whisk/core/controller/Actions.scala @@ -222,11 +222,11 @@ trait WhiskActionsApi extends WhiskCollectionAPI with PostActionActivation with 'result ? false, 'timeout.as[FiniteDuration] ? WhiskActionsApi.maxWaitForBlockingActivation) { (blocking, result, waitOverride) => entity(as[Option[JsObject]]) { payload => - getEntity(WhiskAction, entityStore, entityName.toDocId, Some { - act: WhiskAction => + getEntity(WhiskActionMetaData, entityStore, entityName.toDocId, Some { + act: WhiskActionMetaData => // resolve the action --- special case for sequences that may contain components with '_' as default package val action = act.resolve(user.namespace) - onComplete(entitleReferencedEntities(user, Privilege.ACTIVATE, Some(action.exec))) { + onComplete(entitleReferencedEntitiesMetaData(user, Privilege.ACTIVATE, Some(action.exec))) { case Success(_) => val actionWithMergedParams = env.map(action.inherit(_)) getOrElse action val waitForResponse = if (blocking) Some(waitOverride) else None @@ -386,6 +386,16 @@ trait WhiskActionsApi extends WhiskCollectionAPI with PostActionActivation with } } + private def entitleReferencedEntitiesMetaData(user: Identity, right: Privilege, exec: Option[ExecMetaDataBase])( + implicit transid: TransactionId) = { + exec match { + case Some(seq: SequenceExecMetaData) => + logging.info(this, "checking if sequence components are accessible") + entitlementProvider.check(user, right, referencedEntities(seq)) + case _ => Future.successful(true) + } + } + /** Creates a WhiskAction from PUT content, generating default values where necessary. */ private def make(user: Identity, entityName: FullyQualifiedEntityName, content: WhiskActionPut)( implicit transid: TransactionId) = { diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala index bb723bc..cc140b1 100644 --- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala +++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala @@ -110,7 +110,10 @@ class Controller(val instance: InstanceId, private implicit val activationStore = WhiskActivationStore.datastore(whiskConfig) private implicit val cacheChangeNotification = Some(new CacheChangeNotification { val remoteCacheInvalidaton = new RemoteCacheInvalidation(whiskConfig, "controller", instance) - override def apply(k: CacheKey) = remoteCacheInvalidaton.notifyOtherInstancesAboutInvalidation(k) + override def apply(k: CacheKey) = { + remoteCacheInvalidaton.invalidateWhiskActionMetaData(k) + remoteCacheInvalidaton.notifyOtherInstancesAboutInvalidation(k) + } }) // initialize backend services diff --git a/core/controller/src/main/scala/whisk/core/controller/WebActions.scala b/core/controller/src/main/scala/whisk/core/controller/WebActions.scala index 0a12036..8cfb1e9 100644 --- a/core/controller/src/main/scala/whisk/core/controller/WebActions.scala +++ b/core/controller/src/main/scala/whisk/core/controller/WebActions.scala @@ -449,8 +449,8 @@ trait WhiskWebActionsApi extends Directives with ValidateRequestSize with PostAc * This method is factored out to allow mock testing. */ protected def getAction(actionName: FullyQualifiedEntityName)( - implicit transid: TransactionId): Future[WhiskAction] = { - WhiskAction.get(entityStore, actionName.toDocId) + implicit transid: TransactionId): Future[WhiskActionMetaData] = { + WhiskActionMetaData.get(entityStore, actionName.toDocId) } /** @@ -549,7 +549,7 @@ trait WhiskWebActionsApi extends Directives with ValidateRequestSize with PostAc } private def extractEntityAndProcessRequest(actionOwnerIdentity: Identity, - action: WhiskAction, + action: WhiskActionMetaData, extension: MediaExtension, onBehalfOf: Option[Identity], context: Context, @@ -597,7 +597,7 @@ trait WhiskWebActionsApi extends Directives with ValidateRequestSize with PostAc } private def processRequest(actionOwnerIdentity: Identity, - action: WhiskAction, + action: WhiskActionMetaData, responseType: MediaExtension, onBehalfOf: Option[Identity], context: Context, @@ -694,7 +694,7 @@ trait WhiskWebActionsApi extends Directives with ValidateRequestSize with PostAc * @return future action document or NotFound rejection */ private def actionLookup(actionName: FullyQualifiedEntityName)( - implicit transid: TransactionId): Future[WhiskAction] = { + implicit transid: TransactionId): Future[WhiskActionMetaData] = { getAction(actionName) recoverWith { case _: ArtifactStoreException | DeserializationException(_, _, _) => Future.failed(RejectRequest(NotFound)) @@ -717,8 +717,8 @@ trait WhiskWebActionsApi extends Directives with ValidateRequestSize with PostAc /** * Checks if an action is exported (i.e., carries the required annotation). */ - private def confirmExportedAction(actionLookup: Future[WhiskAction], authenticated: Boolean)( - implicit transid: TransactionId): Future[WhiskAction] = { + private def confirmExportedAction(actionLookup: Future[WhiskActionMetaData], authenticated: Boolean)( + implicit transid: TransactionId): Future[WhiskActionMetaData] = { actionLookup flatMap { action => val requiresAuthenticatedUser = action.annotations.asBool("require-whisk-auth").exists(identity) val isExported = action.annotations.asBool("web-export").exists(identity) diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala b/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala index 34b3bb2..5548229 100644 --- a/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala +++ b/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala @@ -46,14 +46,14 @@ protected[core] trait PostActionActivation extends PrimitiveActions with Sequenc */ protected[controller] def invokeAction( user: Identity, - action: WhiskAction, + action: WhiskActionMetaData, payload: Option[JsObject], waitForResponse: Option[FiniteDuration], cause: Option[ActivationId])(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = { action.toExecutableWhiskAction match { // this is a topmost sequence case None => - val SequenceExec(components) = action.exec + val SequenceExecMetaData(components) = action.exec invokeSequence(user, action, components, payload, waitForResponse, cause, topmost = true, 0).map(r => r._1) // a non-deprecated ExecutableWhiskAction case Some(executable) if !executable.exec.deprecated => diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala index 2a9fa1f..f604e63 100644 --- a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala +++ b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala @@ -92,7 +92,7 @@ protected[actions] trait PrimitiveActions { */ protected[actions] def invokeSingleAction( user: Identity, - action: ExecutableWhiskAction, + action: ExecutableWhiskActionMetaData, payload: Option[JsObject], waitForResponse: Option[FiniteDuration], cause: Option[ActivationId])(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = { diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala index 92b2b80..8df2033 100644 --- a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala +++ b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala @@ -63,7 +63,7 @@ protected[actions] trait SequenceActions { /** A method that knows how to invoke a single primitive action. */ protected[actions] def invokeAction( user: Identity, - action: WhiskAction, + action: WhiskActionMetaData, payload: Option[JsObject], waitForResponse: Option[FiniteDuration], cause: Option[ActivationId])(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] @@ -84,7 +84,7 @@ protected[actions] trait SequenceActions { */ protected[actions] def invokeSequence( user: Identity, - action: WhiskAction, + action: WhiskActionMetaData, components: Vector[FullyQualifiedEntityName], payload: Option[JsObject], waitForOutermostResponse: Option[FiniteDuration], @@ -146,7 +146,7 @@ protected[actions] trait SequenceActions { private def completeSequenceActivation(seqActivationId: ActivationId, futureSeqResult: Future[SequenceAccounting], user: Identity, - action: WhiskAction, + action: WhiskActionMetaData, topmost: Boolean, start: Instant, cause: Option[ActivationId])( @@ -188,7 +188,7 @@ protected[actions] trait SequenceActions { * Creates an activation for a sequence. */ private def makeSequenceActivation(user: Identity, - action: WhiskAction, + action: WhiskActionMetaData, activationId: ActivationId, accounting: SequenceAccounting, topmost: Boolean, @@ -248,7 +248,7 @@ protected[actions] trait SequenceActions { */ private def invokeSequenceComponents( user: Identity, - seqAction: WhiskAction, + seqAction: WhiskActionMetaData, seqActivationId: ActivationId, inputPayload: Option[JsObject], components: Vector[FullyQualifiedEntityName], @@ -264,7 +264,7 @@ protected[actions] trait SequenceActions { // This action/parameter resolution is done in futures; the execution starts as soon as the first component // is resolved. val resolvedFutureActions = resolveDefaultNamespace(components, user) map { c => - WhiskAction.resolveActionAndMergeParameters(entityStore, c) + WhiskActionMetaData.resolveActionAndMergeParameters(entityStore, c) } // this holds the initial value of the accounting structure, including the input boxed as an ActivationResponse @@ -314,7 +314,7 @@ protected[actions] trait SequenceActions { */ private def invokeNextAction( user: Identity, - futureAction: Future[WhiskAction], + futureAction: Future[WhiskActionMetaData], accounting: SequenceAccounting, cause: Option[ActivationId])(implicit transid: TransactionId): Future[SequenceAccounting] = { futureAction.flatMap { action => @@ -327,7 +327,7 @@ protected[actions] trait SequenceActions { // invoke the action by calling the right method depending on whether it's an atomic action or a sequence val futureWhiskActivationTuple = action.toExecutableWhiskAction match { case None => - val SequenceExec(components) = action.exec + val SequenceExecMetaData(components) = action.exec logging.info(this, s"sequence invoking an enclosed sequence $action") // call invokeSequence to invoke the inner sequence; this is a blocking activation by definition invokeSequence( diff --git a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala index a2fc213..9afa83a 100644 --- a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala +++ b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala @@ -325,6 +325,10 @@ trait ReferencedEntities { e.components.map { c => Resource(c.path, Collection(Collection.ACTIONS), Some(c.name.asString)) }.toSet + case e: SequenceExecMetaData => + e.components.map { c => + Resource(c.path, Collection(Collection.ACTIONS), Some(c.name.asString)) + }.toSet case _ => Set() } } diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala index 0b5a06d..4237ed6 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala @@ -46,7 +46,7 @@ import whisk.core.connector.MessagingProvider import whisk.core.database.NoDocumentException import whisk.core.entity.{ActivationId, WhiskActivation} import whisk.core.entity.EntityName -import whisk.core.entity.ExecutableWhiskAction +import whisk.core.entity.ExecutableWhiskActionMetaData import whisk.core.entity.Identity import whisk.core.entity.InstanceId import whisk.core.entity.UUID @@ -76,7 +76,7 @@ trait LoadBalancer { * The future is guaranteed to complete within the declared action time limit * plus a grace period (see activeAckTimeoutGrace). */ - def publish(action: ExecutableWhiskAction, msg: ActivationMessage)( + def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)( implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] } @@ -110,7 +110,7 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore override def totalActiveActivations = loadBalancerData.totalActivationCount - override def publish(action: ExecutableWhiskAction, msg: ActivationMessage)( + override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)( implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = { chooseInvoker(msg.user, action).flatMap { invokerName => val entry = setupActivation(action, msg.activationId, msg.user.uuid, invokerName, transid) @@ -173,7 +173,7 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore /** * Creates an activation entry and insert into various maps. */ - private def setupActivation(action: ExecutableWhiskAction, + private def setupActivation(action: ExecutableWhiskActionMetaData, activationId: ActivationId, namespaceId: UUID, invokerName: InstanceId, @@ -312,7 +312,7 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore } /** Determine which invoker this activation should go to. Due to dynamic conditions, it may return no invoker. */ - private def chooseInvoker(user: Identity, action: ExecutableWhiskAction): Future[InstanceId] = { + private def chooseInvoker(user: Identity, action: ExecutableWhiskActionMetaData): Future[InstanceId] = { val hash = generateHash(user.namespace, action) loadBalancerData.activationCountPerInvoker.flatMap { currentActivations => @@ -334,7 +334,7 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore } /** Generates a hash based on the string representation of namespace and action */ - private def generateHash(namespace: EntityName, action: ExecutableWhiskAction): Int = { + private def generateHash(namespace: EntityName, action: ExecutableWhiskActionMetaData): Int = { (namespace.asString.hashCode() ^ action.fullyQualifiedName(false).asString.hashCode()).abs } } diff --git a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala index 6e65a80..aa60a25 100644 --- a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala +++ b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala @@ -812,6 +812,19 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi { } } + it should "ensure WhiskActionMetadata is used to invoke an action" in { + implicit val tid = transid() + val action = WhiskAction(namespace, aname(), jsDefault("??")) + put(entityStore, action) + Post(s"$collectionPath/${action.name}") ~> Route.seal(routes(creds)) ~> check { + status should be(Accepted) + val response = responseAs[JsObject] + response.fields("activationId") should not be None + } + stream.toString should include(s"[WhiskActionMetaData] [GET] serving from datastore: ${CacheKey(action)}") + stream.reset() + } + it should "report proper error when record is corrupted on delete" in { implicit val tid = transid() val entity = BadEntity(namespace, aname()) diff --git a/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala index 9f99eda..5341836 100644 --- a/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala +++ b/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala @@ -491,7 +491,12 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi implicit val materializer = ActorMaterializer() val activationStore = SpiLoader .get[ArtifactStoreProvider] - .makeStore[WhiskEntity](whiskConfig, _.dbActivations)(WhiskEntityJsonFormat, system, logging, materializer) + .makeStore[WhiskEntity](whiskConfig, _.dbActivations)( + WhiskEntityJsonFormat, + WhiskDocumentReader, + system, + logging, + materializer) implicit val tid = transid() val entity = BadEntity(namespace, EntityName(ActivationId().toString)) put(activationStore, entity) diff --git a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala index 1daa2a4..e7af616 100644 --- a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala +++ b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala @@ -184,7 +184,7 @@ class DegenerateLoadBalancerService(config: WhiskConfig)(implicit ec: ExecutionC override def totalActiveActivations = Future.successful(0) override def activeActivationsFor(namespace: UUID) = Future.successful(0) - override def publish(action: ExecutableWhiskAction, msg: ActivationMessage)( + override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)( implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = Future.successful { whiskActivationStub map { diff --git a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala index 3a55d9b..9f5acad 100644 --- a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala +++ b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala @@ -185,12 +185,12 @@ trait WebActionsApiBaseTests extends ControllerTestCommon with BeforeAndAfterEac override protected def getAction(actionName: FullyQualifiedEntityName)(implicit transid: TransactionId) = { if (!failActionLookup) { def theAction = { - val annotations = Parameters(WhiskAction.finalParamsAnnotationName, JsBoolean(true)) + val annotations = Parameters(WhiskActionMetaData.finalParamsAnnotationName, JsBoolean(true)) - WhiskAction( + WhiskActionMetaData( actionName.path, actionName.name, - jsDefault("??"), + jsDefaultMetaData("??"), defaultActionParameters, annotations = { if (actionName.name.asString.startsWith("export_")) { @@ -242,7 +242,7 @@ trait WebActionsApiBaseTests extends ControllerTestCommon with BeforeAndAfterEac override protected[controller] def invokeAction( user: Identity, - action: WhiskAction, + action: WhiskActionMetaData, payload: Option[JsObject], waitForResponse: Option[FiniteDuration], cause: Option[ActivationId])(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = { diff --git a/tests/src/test/scala/whisk/core/entity/test/ExecHelpers.scala b/tests/src/test/scala/whisk/core/entity/test/ExecHelpers.scala index a3cbde4..3d0be03 100644 --- a/tests/src/test/scala/whisk/core/entity/test/ExecHelpers.scala +++ b/tests/src/test/scala/whisk/core/entity/test/ExecHelpers.scala @@ -56,6 +56,15 @@ trait ExecHelpers extends Matchers with WskActorSystem with StreamLogging { js6(code, main) } + protected def js6MetaData(code: String, main: Option[String] = None) = { + CodeExecMetaDataAsString( + RuntimeManifest(NODEJS6, imagename(NODEJS6), default = Some(true), deprecated = Some(false))) + } + + protected def jsDefaultMetaData(code: String, main: Option[String] = None) = { + js6MetaData(code, main) + } + protected def swift(code: String, main: Option[String] = None) = { CodeExecAsString(RuntimeManifest(SWIFT, imagename(SWIFT), deprecated = Some(true)), trim(code), main.map(_.trim)) } -- To stop receiving notification emails like this one, please contact ['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].