jiangpengcheng commented on a change in pull request #4986:
URL: https://github.com/apache/openwhisk/pull/4986#discussion_r514093768



##########
File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskAction.scala
##########
@@ -346,6 +358,100 @@ case class ExecutableWhiskActionMetaData(namespace: 
EntityPath,
 
 }
 
+case class WhiskActionVersion(id: String, namespace: EntityPath, name: 
EntityName, version: SemVer, publish: Boolean)
+
+object WhiskActionVersion {
+  val serdes = jsonFormat(WhiskActionVersion.apply, "_id", "namespace", 
"name", "version", "publish")
+}
+
+case class WhiskActionVersionList(namespace: EntityPath, name: EntityName, 
versions: Map[SemVer, String]) {
+  def matchedDocId(version: Option[SemVer]): Option[DocId] = {
+    version match {
+      case Some(ver) =>
+        versions.get(ver).map(DocId(_))
+      case None if versions.nonEmpty =>
+        Some(DocId(versions.maxBy(_._1)._2))
+      case _ =>
+        None
+    }
+  }
+}
+
+object WhiskActionVersionList extends 
MultipleReadersSingleWriterCache[WhiskActionVersionList, DocInfo] {
+  val collectionName = "action-versions"
+  lazy val viewName = WhiskQueries.entitiesView(collection = 
collectionName).name
+
+  def cacheKey(action: FullyQualifiedEntityName): CacheKey = {
+    CacheKey(action.fullPath.asString)
+  }
+
+  def get(action: FullyQualifiedEntityName, datastore: EntityStore, fetchAll: 
Boolean = true)(
+    implicit transId: TransactionId): Future[WhiskActionVersionList] = {
+    implicit val logger: Logging = datastore.logging
+    implicit val ec = datastore.executionContext
+
+    val startKey = List(action.fullPath.asString)
+    val endKey = List(action.fullPath.asString, WhiskQueries.TOP)
+    cacheLookup(
+      cacheKey(action),
+      datastore
+        .query(
+          viewName,
+          startKey = startKey,
+          endKey = endKey,
+          skip = 0,
+          limit = 0,
+          includeDocs = false,
+          descending = false,
+          reduce = false,
+          stale = StaleParameter.No)
+        .map { result =>
+          val values = result.map { row =>
+            row.fields("value").asJsObject()
+          }
+          val mappings = values
+            .map(WhiskActionVersion.serdes.read(_))
+            .filter(_.publish || fetchAll)
+            .map { actionVersion =>
+              (actionVersion.version, actionVersion.id)
+            }
+            .toMap
+          WhiskActionVersionList(action.namespace.toPath, action.name, 
mappings)
+        })
+  }
+
+  def getMatchedDocId(
+    action: FullyQualifiedEntityName,
+    version: Option[SemVer],
+    datastore: EntityStore,
+    tryAgain: Boolean = true)(implicit transId: TransactionId, ec: 
ExecutionContext): Future[Option[DocId]] = {
+    get(action, datastore, version.nonEmpty).flatMap { res =>
+      val docId = version match {
+        case Some(ver) =>
+          res.versions.get(ver).map(DocId(_))
+        case None if res.versions.nonEmpty =>
+          Some(DocId(res.versions.maxBy(_._1)._2))
+        case _ =>
+          None
+      }
+      // there may be a chance that database is updated while cache is not, we 
need to invalidate cache and try again
+      if (docId.isEmpty && tryAgain) {
+        WhiskActionVersionList.removeId(cacheKey(action))
+        getMatchedDocId(action, version, datastore, false)

Review comment:
       delete cache in memory first and try again to avoid the inconformity 
between database and cache
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to