This is an automated email from the ASF dual-hosted git repository. chetanm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new b46478b Introduce a AttachmentStore SPI (#3453) b46478b is described below commit b46478bfe6b5068fd57f78cea44b2a6326e89f4b Author: Chetan Mehrotra <chet...@apache.org> AuthorDate: Thu Jun 14 10:35:08 2018 +0530 Introduce a AttachmentStore SPI (#3453) Adds a SPI which defines how an attachment can be stored in a blob/object storage like S3. CouchDBRestStore now supports storing attachments both in Couch or an AttachmentStore. Attachment Storage FLow ======================= While storing an attachment the flow varies based on storage type. Storing in Couch ---------------- 1. Check if it can be inline 2. If not inlined generate an attachment name 3. Update the attachment name in Document and store it in Couch 4. Attach the attachment if not inlined Storing in AttachmentStore -------------------------- 1. Check if it can be inline 2. If not inlined generate an attachment name 3. Upload the attachment to AttachmentStore 4. Update the attachment name in Document and store it in Couch 5. Delete the older attachment Attachment Read Flow ==================== While reading attachment 1. Get attachment scheme 2. If inline -> Read and return 3. If scheme is 1. 'couch' - read from couch 2. No scheme - Attachment in old format which is also stored in Couch only. So read from couch 4. If scheme matches the attachment store scheme then read from AttachmentStore Attachment Metadata =================== AttachmentStore would also compute following metadata as part of attachment attach flow 1. length 2. content hash based on SHA-256 MessageDigest algorithm Both of these metadata are computed as part of stream processing via using multi broadcast flow --- .../src/main/scala/whisk/common/Logging.scala | 2 + .../core/database/ArtifactStoreProvider.scala | 14 +- .../whisk/core/database/AttachmentInliner.scala | 8 +- .../whisk/core/database/AttachmentStore.scala | 71 ++++++++ .../whisk/core/database/CouchDbRestStore.scala | 146 ++++++++++++----- .../whisk/core/database/CouchDbStoreProvider.scala | 11 +- .../scala/whisk/core/database/StoreUtils.scala | 54 ++++++- .../core/database/memory/MemoryArtifactStore.scala | 111 +++++-------- .../database/memory/MemoryAttachmentStore.scala | 139 ++++++++++++++++ .../main/scala/whisk/core/entity/Attachments.scala | 10 +- .../core/database/CouchDBArtifactStoreTests.scala | 26 +-- .../database/CouchDBAttachmentStoreTests.scala | 25 ++- ...eTests.scala => CouchDBStoreBehaviorBase.scala} | 29 ++-- .../database/memory/MemoryArtifactStoreTests.scala | 4 + ...ests.scala => MemoryAttachmentStoreTests.scala} | 32 ++-- .../test/AttachmentCompatibilityTests.scala | 44 ++++- .../database/test/AttachmentStoreBehaviors.scala | 178 +++++++++++++++++++++ .../scala/whisk/core/database/test/DbUtils.scala | 34 ++-- .../ArtifactStoreAttachmentBehaviors.scala | 92 ++++++++++- .../test/behavior/ArtifactStoreBehaviorBase.scala | 31 +++- 20 files changed, 846 insertions(+), 215 deletions(-) diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala index 0e21fe3..a82e56b 100644 --- a/common/scala/src/main/scala/whisk/common/Logging.scala +++ b/common/scala/src/main/scala/whisk/common/Logging.scala @@ -312,5 +312,7 @@ object LoggingMarkers { val DATABASE_QUERY = LogMarkerToken(database, "queryView", start) val DATABASE_ATT_GET = LogMarkerToken(database, "getDocumentAttachment", start) val DATABASE_ATT_SAVE = LogMarkerToken(database, "saveDocumentAttachment", start) + val DATABASE_ATT_DELETE = LogMarkerToken(database, "deleteDocumentAttachment", start) + val DATABASE_ATTS_DELETE = LogMarkerToken(database, "deleteDocumentAttachments", start) val DATABASE_BATCH_SIZE = LogMarkerToken(database, "batchSize", count) } diff --git a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala b/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala index fdcc306..32d6150 100644 --- a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala +++ b/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala @@ -19,9 +19,10 @@ package whisk.core.database import akka.actor.ActorSystem import akka.stream.ActorMaterializer +import com.typesafe.config.ConfigFactory import spray.json.RootJsonFormat import whisk.common.Logging -import whisk.spi.Spi +import whisk.spi.{Spi, SpiLoader} import whisk.core.entity.DocumentReader import scala.reflect.ClassTag @@ -36,4 +37,15 @@ trait ArtifactStoreProvider extends Spi { actorSystem: ActorSystem, logging: Logging, materializer: ActorMaterializer): ArtifactStore[D] + + protected def getAttachmentStore[D <: DocumentSerializer: ClassTag]()( + implicit actorSystem: ActorSystem, + logging: Logging, + materializer: ActorMaterializer): Option[AttachmentStore] = { + if (ConfigFactory.load().hasPath("whisk.spi.AttachmentStoreProvider")) { + Some(SpiLoader.get[AttachmentStoreProvider].makeStore[D]()) + } else { + None + } + } } diff --git a/common/scala/src/main/scala/whisk/core/database/AttachmentInliner.scala b/common/scala/src/main/scala/whisk/core/database/AttachmentInliner.scala index 14eb192..2369b1b 100644 --- a/common/scala/src/main/scala/whisk/core/database/AttachmentInliner.scala +++ b/common/scala/src/main/scala/whisk/core/database/AttachmentInliner.scala @@ -17,7 +17,6 @@ package whisk.core.database -import java.security.MessageDigest import java.util.Base64 import akka.NotUsed @@ -46,8 +45,6 @@ case class InliningConfig(maxInlineSize: ByteSize, chunkSize: ByteSize) * name itself. */ trait AttachmentInliner { - private val digestAlgo = "SHA-256" - private val encodedAlgoName = digestAlgo.toLowerCase.replaceAllLiterally("-", "") /** Materializer required for stream processing */ protected[core] implicit val materializer: Materializer @@ -94,10 +91,9 @@ trait AttachmentInliner { protected[database] def isInlined(uri: Uri): Boolean = uri.scheme == MemScheme protected[database] def digest(bytes: TraversableOnce[Byte]): String = { - val digester = MessageDigest.getInstance(digestAlgo) + val digester = StoreUtils.emptyDigest() digester.update(bytes.toArray) - val digest = digester.digest().map("%02x".format(_)).mkString - s"$encodedAlgoName-$digest" + StoreUtils.encodeDigest(digester.digest()) } /** diff --git a/common/scala/src/main/scala/whisk/core/database/AttachmentStore.scala b/common/scala/src/main/scala/whisk/core/database/AttachmentStore.scala new file mode 100644 index 0000000..e606f1d --- /dev/null +++ b/common/scala/src/main/scala/whisk/core/database/AttachmentStore.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package whisk.core.database + +import akka.actor.ActorSystem +import akka.http.scaladsl.model.ContentType +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{Sink, Source} +import akka.util.ByteString +import whisk.common.{Logging, TransactionId} +import whisk.core.entity.DocId +import whisk.spi.Spi + +import scala.concurrent.{ExecutionContext, Future} +import scala.reflect.ClassTag + +trait AttachmentStoreProvider extends Spi { + def makeStore[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem, + logging: Logging, + materializer: ActorMaterializer): AttachmentStore +} + +case class AttachResult(digest: String, length: Long) + +trait AttachmentStore { + + /** Identifies the store type */ + protected[core] def scheme: String + + /** Execution context for futures */ + protected[core] implicit val executionContext: ExecutionContext + + /** + * Attaches a "file" of type `contentType` to an existing document. + */ + protected[core] def attach(doc: DocId, name: String, contentType: ContentType, docStream: Source[ByteString, _])( + implicit transid: TransactionId): Future[AttachResult] + + /** + * Retrieves a saved attachment, streaming it into the provided Sink. + */ + protected[core] def readAttachment[T](doc: DocId, name: String, sink: Sink[ByteString, Future[T]])( + implicit transid: TransactionId): Future[T] + + /** + * Deletes all attachments linked to given document + */ + protected[core] def deleteAttachments(doc: DocId)(implicit transid: TransactionId): Future[Boolean] + + /** + * Deletes specific attachment. + */ + protected[core] def deleteAttachment(doc: DocId, name: String)(implicit transid: TransactionId): Future[Boolean] + + /** Shut it down. After this invocation, every other call is invalid. */ + def shutdown(): Unit +} diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala index aff7e85..44579d3 100644 --- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala +++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala @@ -27,7 +27,7 @@ import spray.json._ import whisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId} import whisk.core.database.StoreUtils._ import whisk.core.entity.Attachments.Attached -import whisk.core.entity.{BulkEntityResult, DocInfo, DocumentReader, UUID} +import whisk.core.entity.{BulkEntityResult, DocId, DocInfo, DocumentReader, UUID} import whisk.http.Messages import scala.concurrent.duration._ @@ -52,7 +52,8 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St dbPassword: String, dbName: String, useBatching: Boolean = false, - val inliningConfig: InliningConfig)( + val inliningConfig: InliningConfig, + val attachmentStore: Option[AttachmentStore])( implicit system: ActorSystem, val logging: Logging, jsonFormat: RootJsonFormat[DocumentAbstraction], @@ -64,7 +65,8 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St protected[core] implicit val executionContext = system.dispatchers.lookup("dispatchers.couch-dispatcher") - val attachmentScheme: String = "couch" + private val couchScheme = "couch" + val attachmentScheme: String = attachmentStore.map(_.scheme).getOrElse(couchScheme) private val client: CouchDbRestClient = new CouchDbRestClient(dbProtocol, dbHost, dbPort.toInt, dbUsername, dbPassword, dbName) @@ -351,12 +353,25 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St } override protected[database] def putAndAttach[A <: DocumentAbstraction]( - d: A, + doc: A, update: (A, Attached) => A, contentType: ContentType, docStream: Source[ByteString, _], oldAttachment: Option[Attached])(implicit transid: TransactionId): Future[(DocInfo, Attached)] = { + attachmentStore match { + case Some(_) => + attachToExternalStore(doc, update, contentType, docStream, oldAttachment) + case None => + attachToCouch(doc, update, contentType, docStream) + } + } + + private def attachToCouch[A <: DocumentAbstraction]( + doc: A, + update: (A, Attached) => A, + contentType: ContentType, + docStream: Source[ByteString, _])(implicit transid: TransactionId) = { for { (bytes, tailSource) <- inlineAndTail(docStream) uri <- Future.successful(uriOf(bytes, UUID().asString)) @@ -368,13 +383,56 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St } Future.successful(a) } - i1 <- put(update(d, attached)) - i2 <- if (isInlined(uri)) { Future.successful(i1) } else { + i1 <- put(update(doc, attached)) + i2 <- if (isInlined(uri)) { + Future.successful(i1) + } else { attach(i1, uri.path.toString, attached.attachmentType, combinedSource(bytes, tailSource)) } } yield (i2, attached) } + private def attachToExternalStore[A <: DocumentAbstraction]( + doc: A, + update: (A, Attached) => A, + contentType: ContentType, + docStream: Source[ByteString, _], + oldAttachment: Option[Attached])(implicit transid: TransactionId) = { + val as = attachmentStore.get + val asJson = doc.toDocumentRecord + val id = asJson.fields("_id").convertTo[String].trim + + for { + (bytes, tailSource) <- inlineAndTail(docStream) + uri <- Future.successful(uriOf(bytes, UUID().asString)) + attached <- { + // Upload if cannot be inlined + if (isInlined(uri)) { + val a = Attached(uri.toString, contentType, Some(bytes.size), Some(digest(bytes))) + Future.successful(a) + } else { + as.attach(DocId(id), uri.path.toString, contentType, combinedSource(bytes, tailSource)) + .map { r => + Attached(uri.toString, contentType, Some(r.length), Some(r.digest)) + } + } + } + i1 <- put(update(doc, attached)) + + //Remove old attachment if it was part of attachmentStore + _ <- oldAttachment + .map { old => + val oldUri = Uri(old.attachmentName) + if (oldUri.scheme == as.scheme) { + as.deleteAttachment(DocId(id), oldUri.path.toString) + } else { + Future.successful(true) + } + } + .getOrElse(Future.successful(true)) + } yield (i1, attached) + } + private def attach(doc: DocInfo, name: String, contentType: ContentType, docStream: Source[ByteString, _])( implicit transid: TransactionId): Future[DocInfo] = { @@ -421,8 +479,26 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St override protected[core] def readAttachment[T](doc: DocInfo, attached: Attached, sink: Sink[ByteString, Future[T]])( implicit transid: TransactionId): Future[T] = { - val name = attached.attachmentName + val attachmentUri = Uri(name) + attachmentUri.scheme match { + case AttachmentInliner.MemScheme => + memorySource(attachmentUri).runWith(sink) + case s if s == couchScheme || attachmentUri.isRelative => + //relative case is for compatibility with earlier naming approach where attachment name would be like 'jarfile' + //Compared to current approach of '<scheme>:<name>' + readAttachmentFromCouch(doc, attachmentUri, sink) + case s if attachmentStore.isDefined && attachmentStore.get.scheme == s => + attachmentStore.get.readAttachment(doc.id, attachmentUri.path.toString, sink) + case _ => + throw new IllegalArgumentException(s"Unknown attachment scheme in attachment uri $attachmentUri") + } + } + + private def readAttachmentFromCouch[T](doc: DocInfo, attachmentUri: Uri, sink: Sink[ByteString, Future[T]])( + implicit transid: TransactionId): Future[T] = { + + val name = attachmentUri.path val start = transid.started( this, LoggingMarkers.DATABASE_ATT_GET, @@ -431,31 +507,28 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St require(doc != null, "doc undefined") require(doc.rev.rev != null, "doc revision must be specified") - val attachmentUri = Uri(name) - val g = if (isInlined(attachmentUri)) { - memorySource(attachmentUri).runWith(sink) - } else { - val f = client.getAttachment[T](doc.id.id, doc.rev.rev, attachmentUri.path.toString, sink) - f.map { - case Right((_, result)) => - transid.finished(this, start, s"[ATT_GET] '$dbName' completed: found attachment '$name' of document '$doc'") - result - - case Left(StatusCodes.NotFound) => - transid.finished( - this, - start, - s"[ATT_GET] '$dbName', retrieving attachment '$name' of document '$doc'; not found.") - throw NoDocumentException("Not found on 'readAttachment'.") - - case Left(code) => - transid.failed( - this, - start, - s"[ATT_GET] '$dbName' failed to get attachment '$name' of document '$doc'; http status: '${code}'") - throw new Exception("Unexpected http response code: " + code) - } - } + val g = + client + .getAttachment[T](doc.id.id, doc.rev.rev, attachmentUri.path.toString, sink) + .map { + case Right((_, result)) => + transid.finished(this, start, s"[ATT_GET] '$dbName' completed: found attachment '$name' of document '$doc'") + result + + case Left(StatusCodes.NotFound) => + transid.finished( + this, + start, + s"[ATT_GET] '$dbName', retrieving attachment '$name' of document '$doc'; not found.") + throw NoDocumentException("Not found on 'readAttachment'.") + + case Left(code) => + transid.failed( + this, + start, + s"[ATT_GET] '$dbName' failed to get attachment '$name' of document '$doc'; http status: '$code'") + throw new Exception("Unexpected http response code: " + code) + } reportFailure( g, @@ -468,12 +541,13 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St } override protected[core] def deleteAttachments[T](doc: DocInfo)(implicit transid: TransactionId): Future[Boolean] = - // NOTE: this method is not intended for standalone use for CouchDB. - // To delete attachments, it is expected that the entire document is deleted. - Future.successful(true) + attachmentStore + .map(as => as.deleteAttachments(doc.id)) + .getOrElse(Future.successful(true)) // For CouchDB it is expected that the entire document is deleted. override def shutdown(): Unit = { Await.ready(client.shutdown(), 1.minute) + attachmentStore.foreach(_.shutdown()) } private def processAttachments[A <: DocumentAbstraction](doc: A, @@ -492,7 +566,7 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St } attachmentHandler( doc, - Attached(getAttachmentName(name), contentType, Some(length.intValue()), Some(digest))) + Attached(getAttachmentName(name), contentType, Some(length.longValue()), Some(digest))) case x => throw DeserializationException("Attachment json does not have required fields" + x) diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala index df6a374..1fc11a1 100644 --- a/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala +++ b/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala @@ -53,6 +53,14 @@ object CouchDbStoreProvider extends ArtifactStoreProvider { docReader: DocumentReader, actorSystem: ActorSystem, logging: Logging, + materializer: ActorMaterializer): ArtifactStore[D] = makeArtifactStore(useBatching, getAttachmentStore()) + + def makeArtifactStore[D <: DocumentSerializer: ClassTag](useBatching: Boolean, + attachmentStore: Option[AttachmentStore])( + implicit jsonFormat: RootJsonFormat[D], + docReader: DocumentReader, + actorSystem: ActorSystem, + logging: Logging, materializer: ActorMaterializer): ArtifactStore[D] = { val dbConfig = loadConfigOrThrow[CouchDbConfig](ConfigKeys.couchdb) require( @@ -69,6 +77,7 @@ object CouchDbStoreProvider extends ArtifactStoreProvider { dbConfig.password, dbConfig.databaseFor[D], useBatching, - inliningConfig) + inliningConfig, + attachmentStore) } } diff --git a/common/scala/src/main/scala/whisk/core/database/StoreUtils.scala b/common/scala/src/main/scala/whisk/core/database/StoreUtils.scala index cff9dd9..f6351d1 100644 --- a/common/scala/src/main/scala/whisk/core/database/StoreUtils.scala +++ b/common/scala/src/main/scala/whisk/core/database/StoreUtils.scala @@ -17,15 +17,22 @@ package whisk.core.database +import java.security.MessageDigest + import akka.event.Logging.ErrorLevel +import akka.stream.SinkShape +import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, Sink} +import akka.util.ByteString +import spray.json.DefaultJsonProtocol._ import spray.json.{JsObject, RootJsonFormat} import whisk.common.{Logging, StartMarker, TransactionId} -import spray.json.DefaultJsonProtocol._ import whisk.core.entity.{DocInfo, DocRevision, DocumentReader, WhiskDocument} import scala.concurrent.{ExecutionContext, Future} private[database] object StoreUtils { + private val digestAlgo = "SHA-256" + private val encodedAlgoName = digestAlgo.toLowerCase.replaceAllLiterally("-", "") def reportFailure[T](f: Future[T], start: StartMarker, failureMessage: Throwable => String)( implicit transid: TransactionId, @@ -65,4 +72,49 @@ private[database] object StoreUtils { // FIXME remove mutability from appropriate classes now that it is no longer required by GSON. deserialized.asInstanceOf[WhiskDocument].revision(DocRevision(responseRev)) } + + def combinedSink[T](dest: Sink[ByteString, Future[T]])( + implicit ec: ExecutionContext): Sink[ByteString, Future[AttachmentUploadResult[T]]] = { + Sink.fromGraph(GraphDSL.create(digestSink(), lengthSink(), dest)(combineResult) { + implicit builder => (dgs, ls, dests) => + import GraphDSL.Implicits._ + + val bcast = builder.add(Broadcast[ByteString](3)) + + bcast ~> dgs.in + bcast ~> ls.in + bcast ~> dests.in + + SinkShape(bcast.in) + }) + } + + def emptyDigest(): MessageDigest = MessageDigest.getInstance(digestAlgo) + + def encodeDigest(bytes: Array[Byte]): String = { + val digest = bytes.map("%02x".format(_)).mkString + s"$encodedAlgoName-$digest" + } + + private def combineResult[T](digest: Future[String], length: Future[Long], upload: Future[T])( + implicit ec: ExecutionContext) = { + for { + d <- digest + l <- length + u <- upload + } yield AttachmentUploadResult(d, l, u) + } + + case class AttachmentUploadResult[T](digest: String, length: Long, uploadResult: T) + + private def digestSink(): Sink[ByteString, Future[String]] = { + Flow[ByteString] + .fold(emptyDigest())((digest, bytes) => { digest.update(bytes.toArray); digest }) + .map(md => encodeDigest(md.digest())) + .toMat(Sink.head)(Keep.right) + } + + private def lengthSink(): Sink[ByteString, Future[Long]] = { + Sink.fold[Long, ByteString](0)((length, bytes) => length + bytes.size) + } } diff --git a/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala b/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala index 75973ff..140b854 100644 --- a/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala +++ b/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala @@ -20,8 +20,8 @@ package whisk.core.database.memory import akka.actor.ActorSystem import akka.http.scaladsl.model.{ContentType, Uri} import akka.stream.ActorMaterializer -import akka.stream.scaladsl.{Keep, Sink, Source} -import akka.util.{ByteString, ByteStringBuilder} +import akka.stream.scaladsl.{Sink, Source} +import akka.util.ByteString import pureconfig.loadConfigOrThrow import spray.json.{DefaultJsonProtocol, DeserializationException, JsObject, JsString, RootJsonFormat} import whisk.common.{Logging, LoggingMarkers, TransactionId} @@ -45,11 +45,20 @@ object MemoryArtifactStoreProvider extends ArtifactStoreProvider { actorSystem: ActorSystem, logging: Logging, materializer: ActorMaterializer): ArtifactStore[D] = { + makeArtifactStore(MemoryAttachmentStoreProvider.makeStore()) + } + + def makeArtifactStore[D <: DocumentSerializer: ClassTag](attachmentStore: AttachmentStore)( + implicit jsonFormat: RootJsonFormat[D], + docReader: DocumentReader, + actorSystem: ActorSystem, + logging: Logging, + materializer: ActorMaterializer): ArtifactStore[D] = { val classTag = implicitly[ClassTag[D]] val (dbName, handler, viewMapper) = handlerAndMapper(classTag) val inliningConfig = loadConfigOrThrow[InliningConfig](ConfigKeys.db) - new MemoryArtifactStore(dbName, handler, viewMapper, inliningConfig) + new MemoryArtifactStore(dbName, handler, viewMapper, inliningConfig, attachmentStore) } private def handlerAndMapper[D](entityType: ClassTag[D])( @@ -75,7 +84,8 @@ object MemoryArtifactStoreProvider extends ArtifactStoreProvider { class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: String, documentHandler: DocumentHandler, viewMapper: MemoryViewMapper, - val inliningConfig: InliningConfig)( + val inliningConfig: InliningConfig, + val attachmentStore: AttachmentStore)( implicit system: ActorSystem, val logging: Logging, jsonFormat: RootJsonFormat[DocumentAbstraction], @@ -92,7 +102,7 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str private val _id = "_id" private val _rev = "_rev" - val attachmentScheme = "mems" + val attachmentScheme: String = attachmentStore.scheme override protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo] = { val asJson = d.toDocumentRecord @@ -248,7 +258,6 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str override protected[core] def readAttachment[T](doc: DocInfo, attached: Attached, sink: Sink[ByteString, Future[T]])( implicit transid: TransactionId): Future[T] = { - //TODO Temporary implementation till MemoryAttachmentStore PR is merged val name = attached.attachmentName val start = transid.started( this, @@ -260,20 +269,17 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str memorySource(attachmentUri).runWith(sink) } else { val storedName = attachmentUri.path.toString() - artifacts.get(doc.id.id) match { - case Some(a: Artifact) if a.attachments.contains(storedName) => - val attachment = a.attachments(storedName) - val r = Source.single(attachment.bytes).toMat(sink)(Keep.right).run + val f = attachmentStore.readAttachment(doc.id, storedName, sink) + f.onSuccess { + case _ => transid.finished(this, start, s"[ATT_GET] '$dbName' completed: found attachment '$name' of document '$doc'") - r - case None => - Future.failed(NoDocumentException("Not found on 'readAttachment'.")) } + f } } override protected[core] def deleteAttachments[T](doc: DocInfo)(implicit transid: TransactionId): Future[Boolean] = { - Future.successful(true) + attachmentStore.deleteAttachments(doc.id) } override protected[database] def putAndAttach[A <: DocumentAbstraction]( @@ -283,56 +289,42 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str docStream: Source[ByteString, _], oldAttachment: Option[Attached])(implicit transid: TransactionId): Future[(DocInfo, Attached)] = { + val asJson = d.toDocumentRecord + val id = asJson.fields(_id).convertTo[String].trim //Inlined attachment with Memory storage is not required. However to validate the constructs //inlined support is implemented for { - allBytes <- toByteString(docStream) - (bytes, tailSource) <- inlineAndTail(Source.single(allBytes)) + (bytes, tailSource) <- inlineAndTail(docStream) uri <- Future.successful(uriOf(bytes, UUID().asString)) attached <- { - val a = if (isInlined(uri)) { - Attached(uri.toString(), contentType, Some(bytes.size), Some(digest(bytes))) + if (isInlined(uri)) { + val a = Attached(uri.toString, contentType, Some(bytes.size), Some(digest(bytes))) + Future.successful(a) } else { - Attached(uri.toString(), contentType, Some(allBytes.size), Some(digest(allBytes))) + attachmentStore + .attach(DocId(id), uri.path.toString, contentType, combinedSource(bytes, tailSource)) + .map { r => + Attached(uri.toString, contentType, Some(r.length), Some(r.digest)) + } } - Future.successful(a) } i1 <- put(update(d, attached)) - i2 <- if (isInlined(uri)) { Future.successful(i1) } else { - attach(i1, uri.path.toString(), attached.attachmentType, toByteString(combinedSource(bytes, tailSource))) - } - } yield (i2, attached) - } - - private def attach(doc: DocInfo, name: String, contentType: ContentType, bytes: Future[ByteString])( - implicit transid: TransactionId): Future[DocInfo] = { - - val start = transid.started( - this, - LoggingMarkers.DATABASE_ATT_SAVE, - s"[ATT_PUT] '$dbName' uploading attachment '$name' of document '$doc'") - - //TODO Temporary implementation till MemoryAttachmentStore PR is merged - bytes.map { b => - artifacts.get(doc.id.id) match { - case Some(a) => - val existing = Artifact(doc, a.doc, a.computed) - val updated = existing.attach(name, Attachment(b, contentType)) - if (artifacts.replace(doc.id.id, existing, updated)) { - transid - .finished(this, start, s"[ATT_PUT] '$dbName' completed uploading attachment '$name' of document '$doc'") - updated.docInfo + _ <- oldAttachment + .map { old => + val oldUri = Uri(old.attachmentName) + if (oldUri.scheme == attachmentStore.scheme) { + attachmentStore.deleteAttachment(DocId(id), oldUri.path.toString) } else { - throw DocumentConflictException("conflict on 'put'") + Future.successful(true) } - case None => - throw DocumentConflictException("conflict on 'put'") - } - } + } + .getOrElse(Future.successful(true)) + } yield (i1, attached) } override def shutdown(): Unit = { artifacts.clear() + attachmentStore.shutdown() } override protected[database] def get(id: DocId)(implicit transid: TransactionId): Future[Option[JsObject]] = { @@ -354,9 +346,6 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str reportFailure(f, start, failure => s"[GET] '$dbName' internal error, doc: '$id', failure: '${failure.getMessage}'") } - private def toByteString(docStream: Source[Traversable[Byte], _]) = - docStream.runFold(new ByteStringBuilder)((builder, b) => builder ++= b).map(_.result().compact) - private def getRevision(asJson: JsObject) = { asJson.fields.get(_rev) match { case Some(JsString(r)) => r.toInt @@ -367,21 +356,14 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str //Use curried case class to allow equals support only for id and rev //This allows us to implement atomic replace and remove which check //for id,rev equality only - private case class Artifact(id: String, rev: Int)(val doc: JsObject, - val computed: JsObject, - val attachments: Map[String, Attachment] = Map.empty) { + private case class Artifact(id: String, rev: Int)(val doc: JsObject, val computed: JsObject) { def incrementRev(): Artifact = { val (newRev, updatedDoc) = incrementAndGet() - copy(rev = newRev)(updatedDoc, computed, Map.empty) //With Couch attachments are lost post update + copy(rev = newRev)(updatedDoc, computed) //With Couch attachments are lost post update } def docInfo = DocInfo(DocId(id), DocRevision(rev.toString)) - def attach(name: String, attachment: Attachment): Artifact = { - val (newRev, updatedDoc) = incrementAndGet() - copy(rev = newRev)(updatedDoc, computed, attachments + (name -> attachment)) - } - private def incrementAndGet() = { val newRev = rev + 1 val updatedDoc = JsObject(doc.fields + (_rev -> JsString(newRev.toString))) @@ -389,8 +371,6 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str } } - private case class Attachment(bytes: ByteString, contentType: ContentType) - private object Artifact { def apply(id: String, rev: Int, doc: JsObject): Artifact = { Artifact(id, rev)(doc, documentHandler.computedFields(doc)) @@ -399,10 +379,5 @@ class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: Str def apply(info: DocInfo): Artifact = { Artifact(info.id.id, info.rev.rev.toInt)(JsObject.empty, JsObject.empty) } - - def apply(info: DocInfo, doc: JsObject, c: JsObject): Artifact = { - Artifact(info.id.id, info.rev.rev.toInt)(doc, c) - } } - } diff --git a/common/scala/src/main/scala/whisk/core/database/memory/MemoryAttachmentStore.scala b/common/scala/src/main/scala/whisk/core/database/memory/MemoryAttachmentStore.scala new file mode 100644 index 0000000..face871 --- /dev/null +++ b/common/scala/src/main/scala/whisk/core/database/memory/MemoryAttachmentStore.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.database.memory + +import akka.actor.ActorSystem +import akka.http.scaladsl.model.ContentType +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{Keep, Sink, Source} +import akka.util.{ByteString, ByteStringBuilder} +import whisk.common.LoggingMarkers.{DATABASE_ATTS_DELETE, DATABASE_ATT_DELETE, DATABASE_ATT_GET, DATABASE_ATT_SAVE} +import whisk.common.{Logging, TransactionId} +import whisk.core.database.StoreUtils._ +import whisk.core.database._ +import whisk.core.entity.DocId + +import scala.collection.concurrent.TrieMap +import scala.concurrent.{ExecutionContext, Future} +import scala.reflect.ClassTag + +object MemoryAttachmentStoreProvider extends AttachmentStoreProvider { + override def makeStore[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem, + logging: Logging, + materializer: ActorMaterializer): AttachmentStore = + new MemoryAttachmentStore(implicitly[ClassTag[D]].runtimeClass.getSimpleName.toLowerCase) +} + +/** + * Basic in-memory AttachmentStore implementation. Useful for testing. + */ +class MemoryAttachmentStore(dbName: String)(implicit system: ActorSystem, + logging: Logging, + materializer: ActorMaterializer) + extends AttachmentStore { + + override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher + + private case class Attachment(bytes: ByteString) + + private val attachments = new TrieMap[String, Attachment] + private var closed = false + + override val scheme = "mems" + + override protected[core] def attach( + docId: DocId, + name: String, + contentType: ContentType, + docStream: Source[ByteString, _])(implicit transid: TransactionId): Future[AttachResult] = { + require(name != null, "name undefined") + val start = transid.started(this, DATABASE_ATT_SAVE, s"[ATT_PUT] uploading attachment '$name' of document '$docId'") + + val uploadSink = Sink.fold[ByteStringBuilder, ByteString](new ByteStringBuilder)((builder, b) => builder ++= b) + + val f = docStream.runWith(combinedSink(uploadSink)) + + val g = f.map { r => + attachments += (attachmentKey(docId, name) -> Attachment(r.uploadResult.result().compact)) + transid + .finished(this, start, s"[ATT_PUT] '$dbName' completed uploading attachment '$name' of document '$docId'") + AttachResult(r.digest, r.length) + } + + reportFailure( + g, + start, + failure => s"[ATT_PUT] '$dbName' internal error, name: '$name', doc: '$docId', failure: '${failure.getMessage}'") + } + + /** + * Retrieves a saved attachment, streaming it into the provided Sink. + */ + override protected[core] def readAttachment[T](docId: DocId, name: String, sink: Sink[ByteString, Future[T]])( + implicit transid: TransactionId): Future[T] = { + + val start = + transid.started(this, DATABASE_ATT_GET, s"[ATT_GET] '$dbName' finding attachment '$name' of document '$docId'") + + val f = attachments.get(attachmentKey(docId, name)) match { + case Some(Attachment(bytes)) => + val r = Source.single(bytes).toMat(sink)(Keep.right).run + r.map(t => { + transid.finished(this, start, s"[ATT_GET] '$dbName' completed: found attachment '$name' of document '$docId'") + t + }) + case None => + transid.finished( + this, + start, + s"[ATT_GET] '$dbName', retrieving attachment '$name' of document '$docId'; not found.") + Future.failed(NoDocumentException("Not found on 'readAttachment'.")) + } + reportFailure( + f, + start, + failure => s"[ATT_GET] '$dbName' internal error, name: '$name', doc: '$docId', failure: '${failure.getMessage}'") + } + + override protected[core] def deleteAttachments(docId: DocId)(implicit transid: TransactionId): Future[Boolean] = { + val start = transid.started(this, DATABASE_ATTS_DELETE, s"[ATTS_DELETE] uploading attachment of document '$docId'") + + val prefix = docId + "/" + attachments --= attachments.keySet.filter(_.startsWith(prefix)) + transid.finished(this, start, s"[ATTS_DELETE] completed: delete attachment of document '$docId'") + Future.successful(true) + } + + override protected[core] def deleteAttachment(docId: DocId, name: String)( + implicit transid: TransactionId): Future[Boolean] = { + val start = transid.started(this, DATABASE_ATT_DELETE, s"[ATT_DELETE] uploading attachment of document '$docId'") + attachments.remove(attachmentKey(docId, name)) + transid.finished(this, start, s"[ATT_DELETE] completed: delete attachment of document '$docId'") + Future.successful(true) + } + + def attachmentCount: Int = attachments.size + + def isClosed = closed + + override def shutdown(): Unit = { + closed = true + } + + private def attachmentKey(docId: DocId, name: String) = s"${docId.id}/$name" +} diff --git a/common/scala/src/main/scala/whisk/core/entity/Attachments.scala b/common/scala/src/main/scala/whisk/core/entity/Attachments.scala index 6ec9bae..2d00a03 100644 --- a/common/scala/src/main/scala/whisk/core/entity/Attachments.scala +++ b/common/scala/src/main/scala/whisk/core/entity/Attachments.scala @@ -17,15 +17,13 @@ package whisk.core.entity -import scala.util.Try - import akka.http.scaladsl.model.ContentType - -import spray.json._ import spray.json.DefaultJsonProtocol._ - +import spray.json._ import whisk.core.entity.size._ +import scala.util.Try + object Attachments { /** @@ -43,7 +41,7 @@ object Attachments { case class Attached(attachmentName: String, attachmentType: ContentType, - length: Option[Int] = None, + length: Option[Long] = None, digest: Option[String] = None) extends Attachment[Nothing] diff --git a/tests/src/test/scala/whisk/core/database/CouchDBArtifactStoreTests.scala b/tests/src/test/scala/whisk/core/database/CouchDBArtifactStoreTests.scala index a36fd30..159ac5b 100644 --- a/tests/src/test/scala/whisk/core/database/CouchDBArtifactStoreTests.scala +++ b/tests/src/test/scala/whisk/core/database/CouchDBArtifactStoreTests.scala @@ -21,30 +21,6 @@ import org.junit.runner.RunWith import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner import whisk.core.database.test.behavior.ArtifactStoreBehavior -import whisk.core.entity._ - -import scala.reflect.classTag @RunWith(classOf[JUnitRunner]) -class CouchDBArtifactStoreTests extends FlatSpec with ArtifactStoreBehavior { - override def storeType = "CouchDB" - - override val authStore = { - implicit val docReader: DocumentReader = WhiskDocumentReader - CouchDbStoreProvider.makeStore[WhiskAuth]() - } - - override val entityStore = - CouchDbStoreProvider.makeStore[WhiskEntity]()( - classTag[WhiskEntity], - WhiskEntityJsonFormat, - WhiskDocumentReader, - actorSystem, - logging, - materializer) - - override val activationStore = { - implicit val docReader: DocumentReader = WhiskDocumentReader - CouchDbStoreProvider.makeStore[WhiskActivation]() - } -} +class CouchDBArtifactStoreTests extends FlatSpec with CouchDBStoreBehaviorBase with ArtifactStoreBehavior {} diff --git a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala b/tests/src/test/scala/whisk/core/database/CouchDBAttachmentStoreTests.scala similarity index 60% copy from common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala copy to tests/src/test/scala/whisk/core/database/CouchDBAttachmentStoreTests.scala index fdcc306..e0d7986 100644 --- a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala +++ b/tests/src/test/scala/whisk/core/database/CouchDBAttachmentStoreTests.scala @@ -17,23 +17,16 @@ package whisk.core.database -import akka.actor.ActorSystem -import akka.stream.ActorMaterializer -import spray.json.RootJsonFormat -import whisk.common.Logging -import whisk.spi.Spi -import whisk.core.entity.DocumentReader +import org.junit.runner.RunWith +import org.scalatest.FlatSpec +import org.scalatest.junit.JUnitRunner +import whisk.core.database.memory.MemoryAttachmentStoreProvider +import whisk.core.database.test.behavior.ArtifactStoreAttachmentBehaviors import scala.reflect.ClassTag -/** - * An Spi for providing ArtifactStore implementations - */ -trait ArtifactStoreProvider extends Spi { - def makeStore[D <: DocumentSerializer: ClassTag](useBatching: Boolean = false)( - implicit jsonFormat: RootJsonFormat[D], - docReader: DocumentReader, - actorSystem: ActorSystem, - logging: Logging, - materializer: ActorMaterializer): ArtifactStore[D] +@RunWith(classOf[JUnitRunner]) +class CouchDBAttachmentStoreTests extends FlatSpec with CouchDBStoreBehaviorBase with ArtifactStoreAttachmentBehaviors { + override protected def getAttachmentStore[D <: DocumentSerializer: ClassTag]() = + Some(MemoryAttachmentStoreProvider.makeStore[D]()) } diff --git a/tests/src/test/scala/whisk/core/database/CouchDBArtifactStoreTests.scala b/tests/src/test/scala/whisk/core/database/CouchDBStoreBehaviorBase.scala similarity index 58% copy from tests/src/test/scala/whisk/core/database/CouchDBArtifactStoreTests.scala copy to tests/src/test/scala/whisk/core/database/CouchDBStoreBehaviorBase.scala index a36fd30..0a13075 100644 --- a/tests/src/test/scala/whisk/core/database/CouchDBArtifactStoreTests.scala +++ b/tests/src/test/scala/whisk/core/database/CouchDBStoreBehaviorBase.scala @@ -17,25 +17,29 @@ package whisk.core.database -import org.junit.runner.RunWith import org.scalatest.FlatSpec -import org.scalatest.junit.JUnitRunner -import whisk.core.database.test.behavior.ArtifactStoreBehavior -import whisk.core.entity._ +import whisk.core.database.test.behavior.ArtifactStoreBehaviorBase +import whisk.core.entity.{ + DocumentReader, + WhiskActivation, + WhiskAuth, + WhiskDocumentReader, + WhiskEntity, + WhiskEntityJsonFormat +} -import scala.reflect.classTag +import scala.reflect.{classTag, ClassTag} -@RunWith(classOf[JUnitRunner]) -class CouchDBArtifactStoreTests extends FlatSpec with ArtifactStoreBehavior { +trait CouchDBStoreBehaviorBase extends FlatSpec with ArtifactStoreBehaviorBase { override def storeType = "CouchDB" override val authStore = { implicit val docReader: DocumentReader = WhiskDocumentReader - CouchDbStoreProvider.makeStore[WhiskAuth]() + CouchDbStoreProvider.makeArtifactStore[WhiskAuth](useBatching = false, getAttachmentStore[WhiskAuth]()) } override val entityStore = - CouchDbStoreProvider.makeStore[WhiskEntity]()( + CouchDbStoreProvider.makeArtifactStore[WhiskEntity](useBatching = false, getAttachmentStore[WhiskEntity]())( classTag[WhiskEntity], WhiskEntityJsonFormat, WhiskDocumentReader, @@ -45,6 +49,11 @@ class CouchDBArtifactStoreTests extends FlatSpec with ArtifactStoreBehavior { override val activationStore = { implicit val docReader: DocumentReader = WhiskDocumentReader - CouchDbStoreProvider.makeStore[WhiskActivation]() + CouchDbStoreProvider.makeArtifactStore[WhiskActivation](useBatching = true, getAttachmentStore[WhiskActivation]()) } + + override protected def getAttachmentStore(store: ArtifactStore[_]) = + store.asInstanceOf[CouchDbRestStore[_]].attachmentStore + + protected def getAttachmentStore[D <: DocumentSerializer: ClassTag](): Option[AttachmentStore] = None } diff --git a/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala b/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala index 9d4643f..c47daaa 100644 --- a/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala +++ b/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala @@ -20,6 +20,7 @@ package whisk.core.database.memory import org.junit.runner.RunWith import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner +import whisk.core.database.ArtifactStore import whisk.core.database.test.behavior.ArtifactStoreBehavior import whisk.core.entity._ @@ -47,4 +48,7 @@ class MemoryArtifactStoreTests extends FlatSpec with ArtifactStoreBehavior { implicit val docReader: DocumentReader = WhiskDocumentReader MemoryArtifactStoreProvider.makeStore[WhiskActivation]() } + + override protected def getAttachmentStore(store: ArtifactStore[_]) = + Some(store.asInstanceOf[MemoryArtifactStore[_]].attachmentStore) } diff --git a/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala b/tests/src/test/scala/whisk/core/database/memory/MemoryAttachmentStoreTests.scala similarity index 55% copy from tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala copy to tests/src/test/scala/whisk/core/database/memory/MemoryAttachmentStoreTests.scala index 9d4643f..9d845bc 100644 --- a/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala +++ b/tests/src/test/scala/whisk/core/database/memory/MemoryAttachmentStoreTests.scala @@ -17,34 +17,24 @@ package whisk.core.database.memory +import common.WskActorSystem import org.junit.runner.RunWith import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner -import whisk.core.database.test.behavior.ArtifactStoreBehavior -import whisk.core.entity._ - -import scala.reflect.classTag +import whisk.core.database.AttachmentStore +import whisk.core.database.test.AttachmentStoreBehaviors +import whisk.core.entity.WhiskEntity @RunWith(classOf[JUnitRunner]) -class MemoryArtifactStoreTests extends FlatSpec with ArtifactStoreBehavior { - override def storeType = "Memory" +class MemoryAttachmentStoreTests extends FlatSpec with AttachmentStoreBehaviors with WskActorSystem { - override val authStore = { - implicit val docReader: DocumentReader = WhiskDocumentReader - MemoryArtifactStoreProvider.makeStore[WhiskAuth]() - } + override val store: AttachmentStore = MemoryAttachmentStoreProvider.makeStore[WhiskEntity]() - override val entityStore = - MemoryArtifactStoreProvider.makeStore[WhiskEntity]()( - classTag[WhiskEntity], - WhiskEntityJsonFormat, - WhiskDocumentReader, - actorSystem, - logging, - materializer) + override def storeType: String = "Memory" - override val activationStore = { - implicit val docReader: DocumentReader = WhiskDocumentReader - MemoryArtifactStoreProvider.makeStore[WhiskActivation]() + override def afterAll(): Unit = { + super.afterAll() + val count = store.asInstanceOf[MemoryAttachmentStore].attachmentCount + require(count == 0, s"AttachmentStore not empty after all runs - $count") } } diff --git a/tests/src/test/scala/whisk/core/database/test/AttachmentCompatibilityTests.scala b/tests/src/test/scala/whisk/core/database/test/AttachmentCompatibilityTests.scala index 6550cc0..dc8969f 100644 --- a/tests/src/test/scala/whisk/core/database/test/AttachmentCompatibilityTests.scala +++ b/tests/src/test/scala/whisk/core/database/test/AttachmentCompatibilityTests.scala @@ -33,7 +33,8 @@ import pureconfig.loadConfigOrThrow import spray.json.DefaultJsonProtocol import whisk.common.TransactionId import whisk.core.ConfigKeys -import whisk.core.database.{CouchDbConfig, CouchDbRestClient, NoDocumentException} +import whisk.core.database.memory.MemoryAttachmentStoreProvider +import whisk.core.database.{CouchDbConfig, CouchDbRestClient, CouchDbStoreProvider, NoDocumentException} import whisk.core.entity.Attachments.Inline import whisk.core.entity.test.ExecHelpers import whisk.core.entity.{ @@ -42,11 +43,14 @@ import whisk.core.entity.{ EntityName, EntityPath, WhiskAction, + WhiskDocumentReader, WhiskEntity, + WhiskEntityJsonFormat, WhiskEntityStore } import scala.concurrent.Future +import scala.reflect.classTag @RunWith(classOf[JUnitRunner]) class AttachmentCompatibilityTests @@ -93,6 +97,28 @@ class AttachmentCompatibilityTests val doc = WhiskAction(namespace, EntityName("attachment_unique"), exec) + createAction(doc) + + val doc2 = WhiskAction.get(entityStore, doc.docid).futureValue + doc2.exec shouldBe exec + } + + it should "read attachments created using old scheme with AttachmentStore" in { + implicit val tid: TransactionId = transid() + val namespace = EntityPath("attachment-compat-test2") + val exec = javaDefault("ZHViZWU=", Some("hello")) + val doc = + WhiskAction(namespace, EntityName("attachment_unique"), exec) + + createAction(doc) + + val entityStore2 = createEntityStore() + val doc2 = WhiskAction.get(entityStore2, doc.docid).futureValue + doc2.exec shouldBe exec + } + + private def createAction(doc: WhiskAction) = { + implicit val tid: TransactionId = transid() doc.exec match { case exec @ CodeExecAsAttachment(_, Inline(code), _) => val attached = exec.manifest.attached.get @@ -109,9 +135,6 @@ class AttachmentCompatibilityTests case _ => fail("Exec must be code attachment") } - - val doc2 = WhiskAction.get(entityStore, doc.docid).futureValue - doc2.exec shouldBe exec } private def attach(doc: DocInfo, @@ -131,4 +154,17 @@ class AttachmentCompatibilityTests throw new Exception("Unexpected http response code: " + code) } } + + private def createEntityStore() = + CouchDbStoreProvider + .makeArtifactStore[WhiskEntity]( + useBatching = false, + Some(MemoryAttachmentStoreProvider.makeStore[WhiskEntity]()))( + classTag[WhiskEntity], + WhiskEntityJsonFormat, + WhiskDocumentReader, + actorSystem, + logging, + materializer) + } diff --git a/tests/src/test/scala/whisk/core/database/test/AttachmentStoreBehaviors.scala b/tests/src/test/scala/whisk/core/database/test/AttachmentStoreBehaviors.scala new file mode 100644 index 0000000..0b5a879 --- /dev/null +++ b/tests/src/test/scala/whisk/core/database/test/AttachmentStoreBehaviors.scala @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.database.test + +import java.io.ByteArrayInputStream + +import akka.http.scaladsl.model.ContentTypes +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{Sink, Source, StreamConverters} +import akka.util.{ByteString, ByteStringBuilder} +import common.{StreamLogging, WskActorSystem} +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} +import whisk.common.TransactionId +import whisk.core.database.{AttachmentStore, NoDocumentException} +import whisk.core.entity.DocId + +import scala.collection.mutable.ListBuffer +import scala.concurrent.Await +import scala.concurrent.duration.DurationInt +import scala.util.Random + +trait AttachmentStoreBehaviors + extends ScalaFutures + with DbUtils + with Matchers + with StreamLogging + with WskActorSystem + with BeforeAndAfterAll { + this: FlatSpec => + + //Bring in sync the timeout used by ScalaFutures and DBUtils + implicit override val patienceConfig: PatienceConfig = PatienceConfig(timeout = dbOpTimeout) + + protected implicit val materializer: ActorMaterializer = ActorMaterializer() + + protected val prefix = s"attachmentTCK_${Random.alphanumeric.take(4).mkString}" + + private val attachmentsToDelete = ListBuffer[String]() + + def store: AttachmentStore + + def storeType: String + + def garbageCollectAttachments: Boolean = true + + behavior of s"$storeType AttachmentStore" + + it should "add and read attachment" in { + implicit val tid: TransactionId = transid() + val bytes = randomBytes(16023) + + val docId = newDocId() + val result = store.attach(docId, "code", ContentTypes.`application/octet-stream`, chunkedSource(bytes)).futureValue + + result.length shouldBe 16023 + + val byteBuilder = store.readAttachment(docId, "code", byteStringSink()).futureValue + + byteBuilder.result() shouldBe ByteString(bytes) + garbageCollect(docId) + } + + it should "add and delete attachments" in { + implicit val tid: TransactionId = transid() + val b1 = randomBytes(1000) + val b2 = randomBytes(2000) + val b3 = randomBytes(3000) + + val docId = newDocId() + val r1 = store.attach(docId, "c1", ContentTypes.`application/octet-stream`, chunkedSource(b1)).futureValue + val r2 = store.attach(docId, "c2", ContentTypes.`application/json`, chunkedSource(b2)).futureValue + val r3 = store.attach(docId, "c3", ContentTypes.`application/json`, chunkedSource(b3)).futureValue + + r1.length shouldBe 1000 + r2.length shouldBe 2000 + r3.length shouldBe 3000 + + attachmentBytes(docId, "c1").futureValue.result() shouldBe ByteString(b1) + attachmentBytes(docId, "c2").futureValue.result() shouldBe ByteString(b2) + attachmentBytes(docId, "c3").futureValue.result() shouldBe ByteString(b3) + + //Delete single attachment + store.deleteAttachment(docId, "c1").futureValue shouldBe true + + //Non deleted attachments related to same docId must still be accessible + attachmentBytes(docId, "c1").failed.futureValue shouldBe a[NoDocumentException] + attachmentBytes(docId, "c2").futureValue.result() shouldBe ByteString(b2) + attachmentBytes(docId, "c3").futureValue.result() shouldBe ByteString(b3) + + //Delete all attachments + store.deleteAttachments(docId).futureValue shouldBe true + + attachmentBytes(docId, "c2").failed.futureValue shouldBe a[NoDocumentException] + attachmentBytes(docId, "c3").failed.futureValue shouldBe a[NoDocumentException] + } + + it should "throw NoDocumentException on reading non existing attachment" in { + implicit val tid: TransactionId = transid() + + val docId = DocId("no-existing-id") + val f = store.readAttachment(docId, "code", byteStringSink()) + + f.failed.futureValue shouldBe a[NoDocumentException] + } + + it should "not write an attachment when there is error in Source" in { + implicit val tid: TransactionId = transid() + + val docId = newDocId() + val error = new Error("boom!") + val faultySource = Source(1 to 10) + .map { n ⇒ + if (n == 7) throw error + n + } + .map(ByteString(_)) + val writeResult = store.attach(docId, "code", ContentTypes.`application/octet-stream`, faultySource) + writeResult.failed.futureValue.getCause should be theSameInstanceAs error + + val readResult = store.readAttachment(docId, "code", byteStringSink()) + readResult.failed.futureValue shouldBe a[NoDocumentException] + } + + override def afterAll(): Unit = { + if (garbageCollectAttachments) { + implicit val tid: TransactionId = transid() + val f = + Source(attachmentsToDelete.toList) + .mapAsync(2)(id => store.deleteAttachments(DocId(id))) + .runWith(Sink.ignore) + Await.result(f, 1.minute) + } + super.afterAll() + } + + protected def garbageCollect(docId: DocId): Unit = {} + + protected def newDocId(): DocId = { + //By default create an info with dummy revision + //as apart from CouchDB other stores do not support the revision property + //for blobs + counter = counter + 1 + val docId = s"${prefix}_$counter" + attachmentsToDelete += docId + DocId(docId) + } + + @volatile var counter = 0 + + private def attachmentBytes(id: DocId, name: String) = { + implicit val tid: TransactionId = transid() + store.readAttachment(id, name, byteStringSink()) + } + + private def chunkedSource(bytes: Array[Byte]): Source[ByteString, _] = { + StreamConverters.fromInputStream(() => new ByteArrayInputStream(bytes), 42) + } + + private def byteStringSink() = { + Sink.fold[ByteStringBuilder, ByteString](new ByteStringBuilder)((builder, b) => builder ++= b) + } +} diff --git a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala index 39ddad3..2b39fd3 100644 --- a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala +++ b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala @@ -21,26 +21,23 @@ import java.util.Base64 import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.AtomicInteger -import scala.collection.mutable.ListBuffer -import scala.concurrent.Await -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import scala.concurrent.duration.Duration -import scala.concurrent.duration.DurationInt -import scala.language.postfixOps -import scala.util.{Failure, Random, Success, Try} -import spray.json._ +import akka.http.scaladsl.model.ContentType +import akka.stream.scaladsl.Source +import akka.util.ByteString import spray.json.DefaultJsonProtocol._ +import spray.json._ import whisk.common.TransactionId import whisk.core.database._ import whisk.core.database.memory.MemoryArtifactStore -import whisk.core.entity._ -import whisk.core.entity.types.AuthStore -import whisk.core.entity.types.EntityStore -import akka.http.scaladsl.model.ContentType -import akka.stream.scaladsl.Source -import akka.util.ByteString import whisk.core.entity.Attachments.Attached +import whisk.core.entity._ +import whisk.core.entity.types.{AuthStore, EntityStore} + +import scala.collection.mutable.ListBuffer +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.duration.{Duration, DurationInt} +import scala.language.postfixOps +import scala.util.{Failure, Random, Success, Try} /** * WARNING: the put/get/del operations in this trait operate directly on the datastore, @@ -278,7 +275,10 @@ trait DbUtils { */ def cleanup()(implicit timeout: Duration = 10 seconds) = { docsToDelete.map { e => - Try(Await.result(e._1.del(e._2)(TransactionId.testing), timeout)) + Try { + Await.result(e._1.del(e._2)(TransactionId.testing), timeout) + Await.result(e._1.deleteAttachments(e._2)(TransactionId.testing), timeout) + } } docsToDelete.clear() } @@ -321,7 +321,7 @@ trait DbUtils { def isMemoryStore(store: ArtifactStore[_]): Boolean = store.isInstanceOf[MemoryArtifactStore[_]] def isCouchStore(store: ArtifactStore[_]): Boolean = store.isInstanceOf[CouchDbRestStore[_]] - private def randomBytes(size: Int): Array[Byte] = { + protected def randomBytes(size: Int): Array[Byte] = { val arr = new Array[Byte](size) Random.nextBytes(arr) arr diff --git a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala index 18083c3..de7a7a8 100644 --- a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala +++ b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala @@ -18,10 +18,12 @@ package whisk.core.database.test.behavior import java.io.ByteArrayOutputStream +import java.util.Base64 import akka.http.scaladsl.model.{ContentTypes, Uri} import akka.stream.IOResult -import akka.stream.scaladsl.StreamConverters +import akka.stream.scaladsl.{Sink, StreamConverters} +import akka.util.{ByteString, ByteStringBuilder} import whisk.common.TransactionId import whisk.core.database.{AttachmentInliner, CacheChangeNotification, NoDocumentException} import whisk.core.entity.Attachments.{Attached, Attachment, Inline} @@ -29,7 +31,7 @@ import whisk.core.entity.test.ExecHelpers import whisk.core.entity.{CodeExec, DocInfo, EntityName, ExecManifest, WhiskAction} trait ArtifactStoreAttachmentBehaviors extends ArtifactStoreBehaviorBase with ExecHelpers { - behavior of "Attachments" + behavior of s"${storeType}ArtifactStore attachments" private val namespace = newNS() private val attachmentHandler = Some(WhiskAction.attachmentHandler _) @@ -58,6 +60,57 @@ trait ArtifactStoreAttachmentBehaviors extends ArtifactStoreBehaviorBase with Ex attachmentUri.isAbsolute shouldBe true } + /** + * This test asserts that old attachments are deleted and cannot be read again + */ + it should "fail on reading with old non inlined attachment" in { + implicit val tid: TransactionId = transid() + val code1 = nonInlinedCode(entityStore) + val exec = javaDefault(code1, Some("hello")) + val javaAction = + WhiskAction(namespace, EntityName("attachment_update_2"), exec) + + val i1 = WhiskAction.put(entityStore, javaAction, old = None).futureValue + + val action2 = entityStore.get[WhiskAction](i1, attachmentHandler).futureValue + val code2 = nonInlinedCode(entityStore) + val exec2 = javaDefault(code2, Some("hello")) + val action2Updated = action2.copy(exec = exec2).revision[WhiskAction](i1.rev) + + val i2 = WhiskAction.put(entityStore, action2Updated, old = Some(action2)).futureValue + val action3 = entityStore.get[WhiskAction](i2, attachmentHandler).futureValue + + docsToDelete += ((entityStore, i2)) + getAttachmentBytes(i2, attached(action3)).futureValue.result() shouldBe decode(code2) + getAttachmentBytes(i1, attached(action2)).failed.futureValue shouldBe a[NoDocumentException] + } + + /** + * Variant of previous test where read with old attachment should still work + * if attachment is inlined + */ + it should "work on reading with old inlined attachment" in { + implicit val tid: TransactionId = transid() + val code1 = encodedRandomBytes(inlinedAttachmentSize(entityStore)) + val exec = javaDefault(code1, Some("hello")) + val javaAction = + WhiskAction(namespace, EntityName("attachment_update_2"), exec) + + val i1 = WhiskAction.put(entityStore, javaAction, old = None).futureValue + + val action2 = entityStore.get[WhiskAction](i1, attachmentHandler).futureValue + val code2 = nonInlinedCode(entityStore) + val exec2 = javaDefault(code2, Some("hello")) + val action2Updated = action2.copy(exec = exec2).revision[WhiskAction](i1.rev) + + val i2 = WhiskAction.put(entityStore, action2Updated, old = Some(action2)).futureValue + val action3 = entityStore.get[WhiskAction](i2, attachmentHandler).futureValue + + docsToDelete += ((entityStore, i2)) + getAttachmentBytes(i2, attached(action3)).futureValue.result() shouldBe decode(code2) + getAttachmentBytes(i2, attached(action2)).futureValue.result() shouldBe decode(code1) + } + it should "put and read same attachment" in { implicit val tid: TransactionId = transid() val size = nonInlinedAttachmentSize(entityStore) @@ -124,9 +177,44 @@ trait ArtifactStoreAttachmentBehaviors extends ArtifactStoreBehaviorBase with Ex .futureValue shouldBe a[NoDocumentException] } + it should "delete attachment on document delete" in { + val attachmentStore = getAttachmentStore(entityStore) + assume(attachmentStore.isDefined, "ArtifactStore does not have attachmentStore configured") + + implicit val tid: TransactionId = transid() + val size = nonInlinedAttachmentSize(entityStore) + val base64 = encodedRandomBytes(size) + + val exec = javaDefault(base64, Some("hello")) + val javaAction = + WhiskAction(namespace, EntityName("attachment_unique"), exec) + + val i1 = WhiskAction.put(entityStore, javaAction, old = None).futureValue + val action2 = entityStore.get[WhiskAction](i1, attachmentHandler).futureValue + + WhiskAction.del(entityStore, i1).futureValue shouldBe true + + val attachmentName = Uri(attached(action2).attachmentName).path.toString + attachmentStore.get + .readAttachment(i1.id, attachmentName, byteStringSink()) + .failed + .futureValue shouldBe a[NoDocumentException] + } + private def attached(a: WhiskAction): Attached = a.exec.asInstanceOf[CodeExec[Attachment[Nothing]]].code.asInstanceOf[Attached] private def inlined(a: WhiskAction): Inline[String] = a.exec.asInstanceOf[CodeExec[Attachment[String]]].code.asInstanceOf[Inline[String]] + + private def getAttachmentBytes(docInfo: DocInfo, attached: Attached) = { + implicit val tid: TransactionId = transid() + entityStore.readAttachment(docInfo, attached, byteStringSink()) + } + + private def byteStringSink() = { + Sink.fold[ByteStringBuilder, ByteString](new ByteStringBuilder)((builder, b) => builder ++= b) + } + + private def decode(s: String): ByteString = ByteString(Base64.getDecoder.decode(s)) } diff --git a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreBehaviorBase.scala b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreBehaviorBase.scala index e22063d..239bbfc 100644 --- a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreBehaviorBase.scala +++ b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreBehaviorBase.scala @@ -25,8 +25,9 @@ import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures} import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec, Matchers} import spray.json.{JsObject, JsValue} import whisk.common.TransactionId +import whisk.core.database.memory.MemoryAttachmentStore import whisk.core.database.test.DbUtils -import whisk.core.database.{ArtifactStore, StaleParameter} +import whisk.core.database.{ArtifactStore, AttachmentStore, StaleParameter} import whisk.core.entity._ import whisk.utils.JsHelpers @@ -61,11 +62,13 @@ trait ArtifactStoreBehaviorBase } override def afterAll(): Unit = { + assertAttachmentStoreIsEmpty() println("Shutting down store connections") authStore.shutdown() entityStore.shutdown() activationStore.shutdown() super.afterAll() + assertAttachmentStoresAreClosed() } //~----------------------------------------< utility methods > @@ -137,4 +140,30 @@ trait ArtifactStoreBehaviorBase protected def getJsField(js: JsObject, subObject: String, fieldName: String): JsValue = { js.fields(subObject).asJsObject().fields(fieldName) } + + protected def getAttachmentStore(store: ArtifactStore[_]): Option[AttachmentStore] + + protected def getAttachmentCount(store: AttachmentStore): Option[Int] = store match { + case s: MemoryAttachmentStore => Some(s.attachmentCount) + case _ => None + } + + private def assertAttachmentStoreIsEmpty(): Unit = { + Seq(authStore, entityStore, activationStore).foreach { s => + for { + as <- getAttachmentStore(s) + count <- getAttachmentCount(as) + } require(count == 0, s"AttachmentStore not empty after all runs - $count") + } + } + + private def assertAttachmentStoresAreClosed(): Unit = { + Seq(authStore, entityStore, activationStore).foreach { s => + getAttachmentStore(s).foreach { + case s: MemoryAttachmentStore => require(s.isClosed, "AttachmentStore was not closed") + case _ => + } + } + } + } -- To stop receiving notification emails like this one, please contact chet...@apache.org.