[CONF] OpenWhisk > WatcherService

2021-02-22 Thread Jiang PengCheng (Confluence)
Title: Message Title



 
 
 
 
 
 
 

Jiang PengCheng created a page 
 
 
  
 
 
 
 
 
 
 
  
 
 
 
 
 
 
 
 
 
 
 
 
 
 
WatcherService 
 
 
  
 
 
 
 
 
 
  1. Introduction   This service watch "/" prefix key on Etcd, so any data changes to etcd will be noticed. Other components can subscribe/unsubscribe to the WatcherService using different key or prefix key, they can also specify which event type they are intrested, such as Put or Delete or Both. WatcherService will send messages to subscribers when related etcd changes are got via actor message.  2. Architecture Diagram  
 
 
  
 
 
 
 
 
 
 
 
 
 

View page
• 
 
 
 
 
 
 

Like 
 
 
  
 
 
  
 
 
  
 
 
  
 
 
 
 
 
 
 
 
 
 
Stop watching space
• 
 
 
 
 
 
 
Manage notifications 
 
 
 
 
 
 
 
 
 
 
  
 
 
This message was sent by Atlassian Confluence 7.5.0  
 
 
  
 
 
 
 
 
 
 
 
 




[CONF] OpenWhisk > MongoDB Artifact Store

2021-05-25 Thread Jiang PengCheng (Confluence)
Title: Message Title



 
 
 
 
 
 
 

Jiang PengCheng created a page 
 
 
  
 
 
 
 
 
 
 
  
 
 
 
 
 
 
 
 
 
 
 
 
 
 
MongoDB Artifact Store 
 
 
  
 
 
 
 
 
 
 Introduction This implement MongoDBArtifactStore which can replace CouchDB, and it can also work along with ElasticSearch as activations store backend.Design Data Scheme The data scheme in MongoDB is almost same with CouchDB, except 4 differences: 
 
 annotations field in MongoDB is a string instead of array of objects in CouchDB, this is because that it may use arbitrary strcut while MongoDB doesn't support "$" as the first char for field name, so we need to convert this field in to a raw json string before store in to MongoDB, and convert it back to array of object when fetch it 
 parameters  field is same as annotations.  
 _rev field will not be generated automatically in MongoDB, so it is calculted and inserted in code explicitly 
there is a _computed field, which store some extra fields to help to query, 
 Below is an example: 
 
 
 
 
 
 
 
 
CouchDB 
MongoDB 
 
 
 {
  "_id": "whisk.system/invokerHealthTestAction0",
  "_rev": "68-e72440f911c64ab11441c09e730e5ab8",
  "name": "invokerHealthTestAction0",
  "publish": false,
  "annotations": [],
  "version": "0.0.1",
  "updated": 1524476933182,
  "entityType": "action",
  "exec": {
"kind": "nodejs:6",
"code": "function main(params) { return params; }",
"binary": false
  },
  "parameters": [],
  "limits": {
"timeout": 6,
"memory": 256,
"logs": 10
  },
  "namespace": "whisk.system"
}  
 {
  "_id" : "whisk.system/invokerHealthTestAction0",
  "name" : "invokerHealthTestAction0",  "_computed" : {"rootns" : "whisk.system"  },
  "publish" : false,
  "annotations" : "[ ]",
  "version" : "0.0.1",
  "updated" : NumberLong("1524473794826"),
  "entityType" : "action",
  "exec" : {
"kind" : "nodejs:6",
"code" : "function main(params) { return params; }",
"binary" : false
  },
  "parameters" : "[ ]",
  "limits" : {
"timeout" : 6,
"memory" : 256,
"logs" : 10
  },
  "namespace" : "whisk.system"
}  
 
 
 
 Attachment MongoDB use GridFS to store and retrieve files that exceed the BSON-document size limit of 16 MB. Attachment in MongoDB is stored in a separate collection with a independent _id, this PR use the doc._id + doc.file_name as the attachment's _id field, then we can find the relative attachment easily.Implementation There are 5 brand new files except ansible scripts and testing files: common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStoreProvider.scala Just work like `CouchDbStoreProvider` It creates a singleton mongodb client so that WhiskAuthStore/WhiskEntityStore/WhiskActivationStore will share one client.  common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala An implementation of `trait ArtifactStore[DocumentAbstraction]`. There are some private methods need to mention: 
 
 attach(d: DocumentAbstraction, name: String, contentType: ContentType, docStream: Source[ByteString, _])(implicit transid: TransactionId): Future[AttachResult]: 
           this will save action's attachment to MongoDB's gridFSBucket, since the attachment is in a Source format, so we need to use a Sink to process it, this is what MongoDBAsyncStreamSink used for: 
 
 
val uploadStream = gridFSBucket.openUploadStream(BsonString(s"$id/$name"), name, option)
val sink = MongoDBAsyncSt
reamSink(uploadStream)

val f = docStream
  .runWith(combinedSink(sink))
  .map { r =>
transid
  .finished(this, start, s"[ATT_PUT] '$collName' completed uploading attachment '$name' of document '$id'")
AttachResult(r.digest, r.length)
  }
  .recover {
case t: MongoException =>
  transid.failed(
this,
start,
s"[ATT_PUT] '$collName' failed to upload attachment '$name' of document '$id'; error code '${t.getCode}'",
ErrorLevel)
  throw new Exception("Unexpected mongodb server error: " + t.getMessage)
  }

 
 
 
 
 readAttachmentFromMongo[T](doc: DocInfo, attachmentUri: Uri, sink: Sink[ByteString, Future[T]])(implicit transid: TransactionId): Future[T]:  
   Contrary to attach method, it read attchment from MongoDB, since we use a Sink to get the result, we need to construct a Source for MongoDB attachment, so we have a MongoDBAsyncStreamSource: 
 
 

val downloadStream = gridFSBucket.openDownloadStream(BsonString(s"${doc.id.id}/$attachmentName"))

def readStream(file: GridFSFile) = {
  val source = MongoDBAsyncStreamSource(downloadStream)
  source
.runWith(sink)
.map { result =>
  transid
.finished(
  this,
  start,
  s"[ATT_GET] '$collName' completed: found attachment '$attachmentName' of document '$doc'")
  result
}
}

def getGridFSFile = {
  downloadStream