This is an automated email from the ASF dual-hosted git repository.

chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new b46478b  Introduce a AttachmentStore SPI (#3453)
b46478b is described below

commit b46478bfe6b5068fd57f78cea44b2a6326e89f4b
Author: Chetan Mehrotra <chet...@apache.org>
AuthorDate: Thu Jun 14 10:35:08 2018 +0530

    Introduce a AttachmentStore SPI (#3453)
    
    Adds a SPI which defines how an attachment can be stored in a blob/object 
storage like S3.
    
    CouchDBRestStore now supports storing attachments both in Couch or an 
AttachmentStore.
    
    Attachment Storage FLow
    =======================
    
    While storing an attachment the flow varies based on storage type.
    
    Storing in Couch
    ----------------
    
    1. Check if it can be inline
    2. If not inlined generate an attachment name
    3. Update the attachment name in Document and store it in Couch
    4. Attach the attachment if not inlined
    
    Storing in AttachmentStore
    --------------------------
    
    1. Check if it can be inline
    2. If not inlined generate an attachment name
    3. Upload the attachment to AttachmentStore
    4. Update the attachment name in Document and store it in Couch
    5. Delete the older attachment
    
    Attachment Read Flow
    ====================
    
    While reading attachment
    
    1. Get attachment scheme
    2. If inline -> Read and return
    3. If scheme is
        1. 'couch' - read from couch
        2. No scheme - Attachment in old format which is also stored in Couch 
only. So read from couch
    4. If scheme matches the attachment store scheme then read from 
AttachmentStore
    
    Attachment Metadata
    ===================
    
    AttachmentStore would also compute following metadata as part of attachment 
attach flow
    
    1. length
    2. content hash based on SHA-256 MessageDigest algorithm
    
    Both of these metadata are computed as part of stream processing via using 
multi broadcast flow
---
 .../src/main/scala/whisk/common/Logging.scala      |   2 +
 .../core/database/ArtifactStoreProvider.scala      |  14 +-
 .../whisk/core/database/AttachmentInliner.scala    |   8 +-
 .../whisk/core/database/AttachmentStore.scala      |  71 ++++++++
 .../whisk/core/database/CouchDbRestStore.scala     | 146 ++++++++++++-----
 .../whisk/core/database/CouchDbStoreProvider.scala |  11 +-
 .../scala/whisk/core/database/StoreUtils.scala     |  54 ++++++-
 .../core/database/memory/MemoryArtifactStore.scala | 111 +++++--------
 .../database/memory/MemoryAttachmentStore.scala    | 139 ++++++++++++++++
 .../main/scala/whisk/core/entity/Attachments.scala |  10 +-
 .../core/database/CouchDBArtifactStoreTests.scala  |  26 +--
 .../database/CouchDBAttachmentStoreTests.scala     |  25 ++-
 ...eTests.scala => CouchDBStoreBehaviorBase.scala} |  29 ++--
 .../database/memory/MemoryArtifactStoreTests.scala |   4 +
 ...ests.scala => MemoryAttachmentStoreTests.scala} |  32 ++--
 .../test/AttachmentCompatibilityTests.scala        |  44 ++++-
 .../database/test/AttachmentStoreBehaviors.scala   | 178 +++++++++++++++++++++
 .../scala/whisk/core/database/test/DbUtils.scala   |  34 ++--
 .../ArtifactStoreAttachmentBehaviors.scala         |  92 ++++++++++-
 .../test/behavior/ArtifactStoreBehaviorBase.scala  |  31 +++-
 20 files changed, 846 insertions(+), 215 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala 
b/common/scala/src/main/scala/whisk/common/Logging.scala
index 0e21fe3..a82e56b 100644
--- a/common/scala/src/main/scala/whisk/common/Logging.scala
+++ b/common/scala/src/main/scala/whisk/common/Logging.scala
@@ -312,5 +312,7 @@ object LoggingMarkers {
   val DATABASE_QUERY = LogMarkerToken(database, "queryView", start)
   val DATABASE_ATT_GET = LogMarkerToken(database, "getDocumentAttachment", 
start)
   val DATABASE_ATT_SAVE = LogMarkerToken(database, "saveDocumentAttachment", 
start)
+  val DATABASE_ATT_DELETE = LogMarkerToken(database, 
"deleteDocumentAttachment", start)
+  val DATABASE_ATTS_DELETE = LogMarkerToken(database, 
"deleteDocumentAttachments", start)
   val DATABASE_BATCH_SIZE = LogMarkerToken(database, "batchSize", count)
 }
diff --git 
a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala 
b/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala
index fdcc306..32d6150 100644
--- 
a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala
+++ 
b/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala
@@ -19,9 +19,10 @@ package whisk.core.database
 
 import akka.actor.ActorSystem
 import akka.stream.ActorMaterializer
+import com.typesafe.config.ConfigFactory
 import spray.json.RootJsonFormat
 import whisk.common.Logging
-import whisk.spi.Spi
+import whisk.spi.{Spi, SpiLoader}
 import whisk.core.entity.DocumentReader
 
 import scala.reflect.ClassTag
@@ -36,4 +37,15 @@ trait ArtifactStoreProvider extends Spi {
     actorSystem: ActorSystem,
     logging: Logging,
     materializer: ActorMaterializer): ArtifactStore[D]
+
+  protected def getAttachmentStore[D <: DocumentSerializer: ClassTag]()(
+    implicit actorSystem: ActorSystem,
+    logging: Logging,
+    materializer: ActorMaterializer): Option[AttachmentStore] = {
+    if (ConfigFactory.load().hasPath("whisk.spi.AttachmentStoreProvider")) {
+      Some(SpiLoader.get[AttachmentStoreProvider].makeStore[D]())
+    } else {
+      None
+    }
+  }
 }
diff --git 
a/common/scala/src/main/scala/whisk/core/database/AttachmentInliner.scala 
b/common/scala/src/main/scala/whisk/core/database/AttachmentInliner.scala
index 14eb192..2369b1b 100644
--- a/common/scala/src/main/scala/whisk/core/database/AttachmentInliner.scala
+++ b/common/scala/src/main/scala/whisk/core/database/AttachmentInliner.scala
@@ -17,7 +17,6 @@
 
 package whisk.core.database
 
-import java.security.MessageDigest
 import java.util.Base64
 
 import akka.NotUsed
@@ -46,8 +45,6 @@ case class InliningConfig(maxInlineSize: ByteSize, chunkSize: 
ByteSize)
  * name itself.
  */
 trait AttachmentInliner {
-  private val digestAlgo = "SHA-256"
-  private val encodedAlgoName = 
digestAlgo.toLowerCase.replaceAllLiterally("-", "")
 
   /** Materializer required for stream processing */
   protected[core] implicit val materializer: Materializer
@@ -94,10 +91,9 @@ trait AttachmentInliner {
   protected[database] def isInlined(uri: Uri): Boolean = uri.scheme == 
MemScheme
 
   protected[database] def digest(bytes: TraversableOnce[Byte]): String = {
-    val digester = MessageDigest.getInstance(digestAlgo)
+    val digester = StoreUtils.emptyDigest()
     digester.update(bytes.toArray)
-    val digest = digester.digest().map("%02x".format(_)).mkString
-    s"$encodedAlgoName-$digest"
+    StoreUtils.encodeDigest(digester.digest())
   }
 
   /**
diff --git 
a/common/scala/src/main/scala/whisk/core/database/AttachmentStore.scala 
b/common/scala/src/main/scala/whisk/core/database/AttachmentStore.scala
new file mode 100644
index 0000000..e606f1d
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/database/AttachmentStore.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package whisk.core.database
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.ContentType
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.{Sink, Source}
+import akka.util.ByteString
+import whisk.common.{Logging, TransactionId}
+import whisk.core.entity.DocId
+import whisk.spi.Spi
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.reflect.ClassTag
+
+trait AttachmentStoreProvider extends Spi {
+  def makeStore[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: 
ActorSystem,
+                                                     logging: Logging,
+                                                     materializer: 
ActorMaterializer): AttachmentStore
+}
+
+case class AttachResult(digest: String, length: Long)
+
+trait AttachmentStore {
+
+  /** Identifies the store type */
+  protected[core] def scheme: String
+
+  /** Execution context for futures */
+  protected[core] implicit val executionContext: ExecutionContext
+
+  /**
+   * Attaches a "file" of type `contentType` to an existing document.
+   */
+  protected[core] def attach(doc: DocId, name: String, contentType: 
ContentType, docStream: Source[ByteString, _])(
+    implicit transid: TransactionId): Future[AttachResult]
+
+  /**
+   * Retrieves a saved attachment, streaming it into the provided Sink.
+   */
+  protected[core] def readAttachment[T](doc: DocId, name: String, sink: 
Sink[ByteString, Future[T]])(
+    implicit transid: TransactionId): Future[T]
+
+  /**
+   * Deletes all attachments linked to given document
+   */
+  protected[core] def deleteAttachments(doc: DocId)(implicit transid: 
TransactionId): Future[Boolean]
+
+  /**
+   * Deletes specific attachment.
+   */
+  protected[core] def deleteAttachment(doc: DocId, name: String)(implicit 
transid: TransactionId): Future[Boolean]
+
+  /** Shut it down. After this invocation, every other call is invalid. */
+  def shutdown(): Unit
+}
diff --git 
a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala 
b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
index aff7e85..44579d3 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
@@ -27,7 +27,7 @@ import spray.json._
 import whisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
 import whisk.core.database.StoreUtils._
 import whisk.core.entity.Attachments.Attached
-import whisk.core.entity.{BulkEntityResult, DocInfo, DocumentReader, UUID}
+import whisk.core.entity.{BulkEntityResult, DocId, DocInfo, DocumentReader, 
UUID}
 import whisk.http.Messages
 
 import scala.concurrent.duration._
@@ -52,7 +52,8 @@ class CouchDbRestStore[DocumentAbstraction <: 
DocumentSerializer](dbProtocol: St
                                                                   dbPassword: 
String,
                                                                   dbName: 
String,
                                                                   useBatching: 
Boolean = false,
-                                                                  val 
inliningConfig: InliningConfig)(
+                                                                  val 
inliningConfig: InliningConfig,
+                                                                  val 
attachmentStore: Option[AttachmentStore])(
   implicit system: ActorSystem,
   val logging: Logging,
   jsonFormat: RootJsonFormat[DocumentAbstraction],
@@ -64,7 +65,8 @@ class CouchDbRestStore[DocumentAbstraction <: 
DocumentSerializer](dbProtocol: St
 
   protected[core] implicit val executionContext = 
system.dispatchers.lookup("dispatchers.couch-dispatcher")
 
-  val attachmentScheme: String = "couch"
+  private val couchScheme = "couch"
+  val attachmentScheme: String = 
attachmentStore.map(_.scheme).getOrElse(couchScheme)
 
   private val client: CouchDbRestClient =
     new CouchDbRestClient(dbProtocol, dbHost, dbPort.toInt, dbUsername, 
dbPassword, dbName)
@@ -351,12 +353,25 @@ class CouchDbRestStore[DocumentAbstraction <: 
DocumentSerializer](dbProtocol: St
   }
 
   override protected[database] def putAndAttach[A <: DocumentAbstraction](
-    d: A,
+    doc: A,
     update: (A, Attached) => A,
     contentType: ContentType,
     docStream: Source[ByteString, _],
     oldAttachment: Option[Attached])(implicit transid: TransactionId): 
Future[(DocInfo, Attached)] = {
 
+    attachmentStore match {
+      case Some(_) =>
+        attachToExternalStore(doc, update, contentType, docStream, 
oldAttachment)
+      case None =>
+        attachToCouch(doc, update, contentType, docStream)
+    }
+  }
+
+  private def attachToCouch[A <: DocumentAbstraction](
+    doc: A,
+    update: (A, Attached) => A,
+    contentType: ContentType,
+    docStream: Source[ByteString, _])(implicit transid: TransactionId) = {
     for {
       (bytes, tailSource) <- inlineAndTail(docStream)
       uri <- Future.successful(uriOf(bytes, UUID().asString))
@@ -368,13 +383,56 @@ class CouchDbRestStore[DocumentAbstraction <: 
DocumentSerializer](dbProtocol: St
         }
         Future.successful(a)
       }
-      i1 <- put(update(d, attached))
-      i2 <- if (isInlined(uri)) { Future.successful(i1) } else {
+      i1 <- put(update(doc, attached))
+      i2 <- if (isInlined(uri)) {
+        Future.successful(i1)
+      } else {
         attach(i1, uri.path.toString, attached.attachmentType, 
combinedSource(bytes, tailSource))
       }
     } yield (i2, attached)
   }
 
+  private def attachToExternalStore[A <: DocumentAbstraction](
+    doc: A,
+    update: (A, Attached) => A,
+    contentType: ContentType,
+    docStream: Source[ByteString, _],
+    oldAttachment: Option[Attached])(implicit transid: TransactionId) = {
+    val as = attachmentStore.get
+    val asJson = doc.toDocumentRecord
+    val id = asJson.fields("_id").convertTo[String].trim
+
+    for {
+      (bytes, tailSource) <- inlineAndTail(docStream)
+      uri <- Future.successful(uriOf(bytes, UUID().asString))
+      attached <- {
+        // Upload if cannot be inlined
+        if (isInlined(uri)) {
+          val a = Attached(uri.toString, contentType, Some(bytes.size), 
Some(digest(bytes)))
+          Future.successful(a)
+        } else {
+          as.attach(DocId(id), uri.path.toString, contentType, 
combinedSource(bytes, tailSource))
+            .map { r =>
+              Attached(uri.toString, contentType, Some(r.length), 
Some(r.digest))
+            }
+        }
+      }
+      i1 <- put(update(doc, attached))
+
+      //Remove old attachment if it was part of attachmentStore
+      _ <- oldAttachment
+        .map { old =>
+          val oldUri = Uri(old.attachmentName)
+          if (oldUri.scheme == as.scheme) {
+            as.deleteAttachment(DocId(id), oldUri.path.toString)
+          } else {
+            Future.successful(true)
+          }
+        }
+        .getOrElse(Future.successful(true))
+    } yield (i1, attached)
+  }
+
   private def attach(doc: DocInfo, name: String, contentType: ContentType, 
docStream: Source[ByteString, _])(
     implicit transid: TransactionId): Future[DocInfo] = {
 
@@ -421,8 +479,26 @@ class CouchDbRestStore[DocumentAbstraction <: 
DocumentSerializer](dbProtocol: St
 
   override protected[core] def readAttachment[T](doc: DocInfo, attached: 
Attached, sink: Sink[ByteString, Future[T]])(
     implicit transid: TransactionId): Future[T] = {
-
     val name = attached.attachmentName
+    val attachmentUri = Uri(name)
+    attachmentUri.scheme match {
+      case AttachmentInliner.MemScheme =>
+        memorySource(attachmentUri).runWith(sink)
+      case s if s == couchScheme || attachmentUri.isRelative =>
+        //relative case is for compatibility with earlier naming approach 
where attachment name would be like 'jarfile'
+        //Compared to current approach of '<scheme>:<name>'
+        readAttachmentFromCouch(doc, attachmentUri, sink)
+      case s if attachmentStore.isDefined && attachmentStore.get.scheme == s =>
+        attachmentStore.get.readAttachment(doc.id, 
attachmentUri.path.toString, sink)
+      case _ =>
+        throw new IllegalArgumentException(s"Unknown attachment scheme in 
attachment uri $attachmentUri")
+    }
+  }
+
+  private def readAttachmentFromCouch[T](doc: DocInfo, attachmentUri: Uri, 
sink: Sink[ByteString, Future[T]])(
+    implicit transid: TransactionId): Future[T] = {
+
+    val name = attachmentUri.path
     val start = transid.started(
       this,
       LoggingMarkers.DATABASE_ATT_GET,
@@ -431,31 +507,28 @@ class CouchDbRestStore[DocumentAbstraction <: 
DocumentSerializer](dbProtocol: St
     require(doc != null, "doc undefined")
     require(doc.rev.rev != null, "doc revision must be specified")
 
-    val attachmentUri = Uri(name)
-    val g = if (isInlined(attachmentUri)) {
-      memorySource(attachmentUri).runWith(sink)
-    } else {
-      val f = client.getAttachment[T](doc.id.id, doc.rev.rev, 
attachmentUri.path.toString, sink)
-      f.map {
-        case Right((_, result)) =>
-          transid.finished(this, start, s"[ATT_GET] '$dbName' completed: found 
attachment '$name' of document '$doc'")
-          result
-
-        case Left(StatusCodes.NotFound) =>
-          transid.finished(
-            this,
-            start,
-            s"[ATT_GET] '$dbName', retrieving attachment '$name' of document 
'$doc'; not found.")
-          throw NoDocumentException("Not found on 'readAttachment'.")
-
-        case Left(code) =>
-          transid.failed(
-            this,
-            start,
-            s"[ATT_GET] '$dbName' failed to get attachment '$name' of document 
'$doc'; http status: '${code}'")
-          throw new Exception("Unexpected http response code: " + code)
-      }
-    }
+    val g =
+      client
+        .getAttachment[T](doc.id.id, doc.rev.rev, attachmentUri.path.toString, 
sink)
+        .map {
+          case Right((_, result)) =>
+            transid.finished(this, start, s"[ATT_GET] '$dbName' completed: 
found attachment '$name' of document '$doc'")
+            result
+
+          case Left(StatusCodes.NotFound) =>
+            transid.finished(
+              this,
+              start,
+              s"[ATT_GET] '$dbName', retrieving attachment '$name' of document 
'$doc'; not found.")
+            throw NoDocumentException("Not found on 'readAttachment'.")
+
+          case Left(code) =>
+            transid.failed(
+              this,
+              start,
+              s"[ATT_GET] '$dbName' failed to get attachment '$name' of 
document '$doc'; http status: '$code'")
+            throw new Exception("Unexpected http response code: " + code)
+        }
 
     reportFailure(
       g,
@@ -468,12 +541,13 @@ class CouchDbRestStore[DocumentAbstraction <: 
DocumentSerializer](dbProtocol: St
   }
 
   override protected[core] def deleteAttachments[T](doc: DocInfo)(implicit 
transid: TransactionId): Future[Boolean] =
-    // NOTE: this method is not intended for standalone use for CouchDB.
-    // To delete attachments, it is expected that the entire document is 
deleted.
-    Future.successful(true)
+    attachmentStore
+      .map(as => as.deleteAttachments(doc.id))
+      .getOrElse(Future.successful(true)) // For CouchDB it is expected that 
the entire document is deleted.
 
   override def shutdown(): Unit = {
     Await.ready(client.shutdown(), 1.minute)
+    attachmentStore.foreach(_.shutdown())
   }
 
   private def processAttachments[A <: DocumentAbstraction](doc: A,
@@ -492,7 +566,7 @@ class CouchDbRestStore[DocumentAbstraction <: 
DocumentSerializer](dbProtocol: St
               }
               attachmentHandler(
                 doc,
-                Attached(getAttachmentName(name), contentType, 
Some(length.intValue()), Some(digest)))
+                Attached(getAttachmentName(name), contentType, 
Some(length.longValue()), Some(digest)))
             case x =>
               throw DeserializationException("Attachment json does not have 
required fields" + x)
 
diff --git 
a/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala 
b/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala
index df6a374..1fc11a1 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala
@@ -53,6 +53,14 @@ object CouchDbStoreProvider extends ArtifactStoreProvider {
     docReader: DocumentReader,
     actorSystem: ActorSystem,
     logging: Logging,
+    materializer: ActorMaterializer): ArtifactStore[D] = 
makeArtifactStore(useBatching, getAttachmentStore())
+
+  def makeArtifactStore[D <: DocumentSerializer: ClassTag](useBatching: 
Boolean,
+                                                           attachmentStore: 
Option[AttachmentStore])(
+    implicit jsonFormat: RootJsonFormat[D],
+    docReader: DocumentReader,
+    actorSystem: ActorSystem,
+    logging: Logging,
     materializer: ActorMaterializer): ArtifactStore[D] = {
     val dbConfig = loadConfigOrThrow[CouchDbConfig](ConfigKeys.couchdb)
     require(
@@ -69,6 +77,7 @@ object CouchDbStoreProvider extends ArtifactStoreProvider {
       dbConfig.password,
       dbConfig.databaseFor[D],
       useBatching,
-      inliningConfig)
+      inliningConfig,
+      attachmentStore)
   }
 }
diff --git a/common/scala/src/main/scala/whisk/core/database/StoreUtils.scala 
b/common/scala/src/main/scala/whisk/core/database/StoreUtils.scala
index cff9dd9..f6351d1 100644
--- a/common/scala/src/main/scala/whisk/core/database/StoreUtils.scala
+++ b/common/scala/src/main/scala/whisk/core/database/StoreUtils.scala
@@ -17,15 +17,22 @@
 
 package whisk.core.database
 
+import java.security.MessageDigest
+
 import akka.event.Logging.ErrorLevel
+import akka.stream.SinkShape
+import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, Sink}
+import akka.util.ByteString
+import spray.json.DefaultJsonProtocol._
 import spray.json.{JsObject, RootJsonFormat}
 import whisk.common.{Logging, StartMarker, TransactionId}
-import spray.json.DefaultJsonProtocol._
 import whisk.core.entity.{DocInfo, DocRevision, DocumentReader, WhiskDocument}
 
 import scala.concurrent.{ExecutionContext, Future}
 
 private[database] object StoreUtils {
+  private val digestAlgo = "SHA-256"
+  private val encodedAlgoName = 
digestAlgo.toLowerCase.replaceAllLiterally("-", "")
 
   def reportFailure[T](f: Future[T], start: StartMarker, failureMessage: 
Throwable => String)(
     implicit transid: TransactionId,
@@ -65,4 +72,49 @@ private[database] object StoreUtils {
     // FIXME remove mutability from appropriate classes now that it is no 
longer required by GSON.
     deserialized.asInstanceOf[WhiskDocument].revision(DocRevision(responseRev))
   }
+
+  def combinedSink[T](dest: Sink[ByteString, Future[T]])(
+    implicit ec: ExecutionContext): Sink[ByteString, 
Future[AttachmentUploadResult[T]]] = {
+    Sink.fromGraph(GraphDSL.create(digestSink(), lengthSink(), 
dest)(combineResult) {
+      implicit builder => (dgs, ls, dests) =>
+        import GraphDSL.Implicits._
+
+        val bcast = builder.add(Broadcast[ByteString](3))
+
+        bcast ~> dgs.in
+        bcast ~> ls.in
+        bcast ~> dests.in
+
+        SinkShape(bcast.in)
+    })
+  }
+
+  def emptyDigest(): MessageDigest = MessageDigest.getInstance(digestAlgo)
+
+  def encodeDigest(bytes: Array[Byte]): String = {
+    val digest = bytes.map("%02x".format(_)).mkString
+    s"$encodedAlgoName-$digest"
+  }
+
+  private def combineResult[T](digest: Future[String], length: Future[Long], 
upload: Future[T])(
+    implicit ec: ExecutionContext) = {
+    for {
+      d <- digest
+      l <- length
+      u <- upload
+    } yield AttachmentUploadResult(d, l, u)
+  }
+
+  case class AttachmentUploadResult[T](digest: String, length: Long, 
uploadResult: T)
+
+  private def digestSink(): Sink[ByteString, Future[String]] = {
+    Flow[ByteString]
+      .fold(emptyDigest())((digest, bytes) => { digest.update(bytes.toArray); 
digest })
+      .map(md => encodeDigest(md.digest()))
+      .toMat(Sink.head)(Keep.right)
+  }
+
+  private def lengthSink(): Sink[ByteString, Future[Long]] = {
+    Sink.fold[Long, ByteString](0)((length, bytes) => length + bytes.size)
+  }
 }
diff --git 
a/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
 
b/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
index 75973ff..140b854 100644
--- 
a/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
+++ 
b/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
@@ -20,8 +20,8 @@ package whisk.core.database.memory
 import akka.actor.ActorSystem
 import akka.http.scaladsl.model.{ContentType, Uri}
 import akka.stream.ActorMaterializer
-import akka.stream.scaladsl.{Keep, Sink, Source}
-import akka.util.{ByteString, ByteStringBuilder}
+import akka.stream.scaladsl.{Sink, Source}
+import akka.util.ByteString
 import pureconfig.loadConfigOrThrow
 import spray.json.{DefaultJsonProtocol, DeserializationException, JsObject, 
JsString, RootJsonFormat}
 import whisk.common.{Logging, LoggingMarkers, TransactionId}
@@ -45,11 +45,20 @@ object MemoryArtifactStoreProvider extends 
ArtifactStoreProvider {
     actorSystem: ActorSystem,
     logging: Logging,
     materializer: ActorMaterializer): ArtifactStore[D] = {
+    makeArtifactStore(MemoryAttachmentStoreProvider.makeStore())
+  }
+
+  def makeArtifactStore[D <: DocumentSerializer: ClassTag](attachmentStore: 
AttachmentStore)(
+    implicit jsonFormat: RootJsonFormat[D],
+    docReader: DocumentReader,
+    actorSystem: ActorSystem,
+    logging: Logging,
+    materializer: ActorMaterializer): ArtifactStore[D] = {
 
     val classTag = implicitly[ClassTag[D]]
     val (dbName, handler, viewMapper) = handlerAndMapper(classTag)
     val inliningConfig = loadConfigOrThrow[InliningConfig](ConfigKeys.db)
-    new MemoryArtifactStore(dbName, handler, viewMapper, inliningConfig)
+    new MemoryArtifactStore(dbName, handler, viewMapper, inliningConfig, 
attachmentStore)
   }
 
   private def handlerAndMapper[D](entityType: ClassTag[D])(
@@ -75,7 +84,8 @@ object MemoryArtifactStoreProvider extends 
ArtifactStoreProvider {
 class MemoryArtifactStore[DocumentAbstraction <: DocumentSerializer](dbName: 
String,
                                                                      
documentHandler: DocumentHandler,
                                                                      
viewMapper: MemoryViewMapper,
-                                                                     val 
inliningConfig: InliningConfig)(
+                                                                     val 
inliningConfig: InliningConfig,
+                                                                     val 
attachmentStore: AttachmentStore)(
   implicit system: ActorSystem,
   val logging: Logging,
   jsonFormat: RootJsonFormat[DocumentAbstraction],
@@ -92,7 +102,7 @@ class MemoryArtifactStore[DocumentAbstraction <: 
DocumentSerializer](dbName: Str
 
   private val _id = "_id"
   private val _rev = "_rev"
-  val attachmentScheme = "mems"
+  val attachmentScheme: String = attachmentStore.scheme
 
   override protected[database] def put(d: DocumentAbstraction)(implicit 
transid: TransactionId): Future[DocInfo] = {
     val asJson = d.toDocumentRecord
@@ -248,7 +258,6 @@ class MemoryArtifactStore[DocumentAbstraction <: 
DocumentSerializer](dbName: Str
 
   override protected[core] def readAttachment[T](doc: DocInfo, attached: 
Attached, sink: Sink[ByteString, Future[T]])(
     implicit transid: TransactionId): Future[T] = {
-    //TODO Temporary implementation till MemoryAttachmentStore PR is merged
     val name = attached.attachmentName
     val start = transid.started(
       this,
@@ -260,20 +269,17 @@ class MemoryArtifactStore[DocumentAbstraction <: 
DocumentSerializer](dbName: Str
       memorySource(attachmentUri).runWith(sink)
     } else {
       val storedName = attachmentUri.path.toString()
-      artifacts.get(doc.id.id) match {
-        case Some(a: Artifact) if a.attachments.contains(storedName) =>
-          val attachment = a.attachments(storedName)
-          val r = Source.single(attachment.bytes).toMat(sink)(Keep.right).run
+      val f = attachmentStore.readAttachment(doc.id, storedName, sink)
+      f.onSuccess {
+        case _ =>
           transid.finished(this, start, s"[ATT_GET] '$dbName' completed: found 
attachment '$name' of document '$doc'")
-          r
-        case None =>
-          Future.failed(NoDocumentException("Not found on 'readAttachment'."))
       }
+      f
     }
   }
 
   override protected[core] def deleteAttachments[T](doc: DocInfo)(implicit 
transid: TransactionId): Future[Boolean] = {
-    Future.successful(true)
+    attachmentStore.deleteAttachments(doc.id)
   }
 
   override protected[database] def putAndAttach[A <: DocumentAbstraction](
@@ -283,56 +289,42 @@ class MemoryArtifactStore[DocumentAbstraction <: 
DocumentSerializer](dbName: Str
     docStream: Source[ByteString, _],
     oldAttachment: Option[Attached])(implicit transid: TransactionId): 
Future[(DocInfo, Attached)] = {
 
+    val asJson = d.toDocumentRecord
+    val id = asJson.fields(_id).convertTo[String].trim
     //Inlined attachment with Memory storage is not required. However to 
validate the constructs
     //inlined support is implemented
     for {
-      allBytes <- toByteString(docStream)
-      (bytes, tailSource) <- inlineAndTail(Source.single(allBytes))
+      (bytes, tailSource) <- inlineAndTail(docStream)
       uri <- Future.successful(uriOf(bytes, UUID().asString))
       attached <- {
-        val a = if (isInlined(uri)) {
-          Attached(uri.toString(), contentType, Some(bytes.size), 
Some(digest(bytes)))
+        if (isInlined(uri)) {
+          val a = Attached(uri.toString, contentType, Some(bytes.size), 
Some(digest(bytes)))
+          Future.successful(a)
         } else {
-          Attached(uri.toString(), contentType, Some(allBytes.size), 
Some(digest(allBytes)))
+          attachmentStore
+            .attach(DocId(id), uri.path.toString, contentType, 
combinedSource(bytes, tailSource))
+            .map { r =>
+              Attached(uri.toString, contentType, Some(r.length), 
Some(r.digest))
+            }
         }
-        Future.successful(a)
       }
       i1 <- put(update(d, attached))
-      i2 <- if (isInlined(uri)) { Future.successful(i1) } else {
-        attach(i1, uri.path.toString(), attached.attachmentType, 
toByteString(combinedSource(bytes, tailSource)))
-      }
-    } yield (i2, attached)
-  }
-
-  private def attach(doc: DocInfo, name: String, contentType: ContentType, 
bytes: Future[ByteString])(
-    implicit transid: TransactionId): Future[DocInfo] = {
-
-    val start = transid.started(
-      this,
-      LoggingMarkers.DATABASE_ATT_SAVE,
-      s"[ATT_PUT] '$dbName' uploading attachment '$name' of document '$doc'")
-
-    //TODO Temporary implementation till MemoryAttachmentStore PR is merged
-    bytes.map { b =>
-      artifacts.get(doc.id.id) match {
-        case Some(a) =>
-          val existing = Artifact(doc, a.doc, a.computed)
-          val updated = existing.attach(name, Attachment(b, contentType))
-          if (artifacts.replace(doc.id.id, existing, updated)) {
-            transid
-              .finished(this, start, s"[ATT_PUT] '$dbName' completed uploading 
attachment '$name' of document '$doc'")
-            updated.docInfo
+      _ <- oldAttachment
+        .map { old =>
+          val oldUri = Uri(old.attachmentName)
+          if (oldUri.scheme == attachmentStore.scheme) {
+            attachmentStore.deleteAttachment(DocId(id), oldUri.path.toString)
           } else {
-            throw DocumentConflictException("conflict on 'put'")
+            Future.successful(true)
           }
-        case None =>
-          throw DocumentConflictException("conflict on 'put'")
-      }
-    }
+        }
+        .getOrElse(Future.successful(true))
+    } yield (i1, attached)
   }
 
   override def shutdown(): Unit = {
     artifacts.clear()
+    attachmentStore.shutdown()
   }
 
   override protected[database] def get(id: DocId)(implicit transid: 
TransactionId): Future[Option[JsObject]] = {
@@ -354,9 +346,6 @@ class MemoryArtifactStore[DocumentAbstraction <: 
DocumentSerializer](dbName: Str
     reportFailure(f, start, failure => s"[GET] '$dbName' internal error, doc: 
'$id', failure: '${failure.getMessage}'")
   }
 
-  private def toByteString(docStream: Source[Traversable[Byte], _]) =
-    docStream.runFold(new ByteStringBuilder)((builder, b) => builder ++= 
b).map(_.result().compact)
-
   private def getRevision(asJson: JsObject) = {
     asJson.fields.get(_rev) match {
       case Some(JsString(r)) => r.toInt
@@ -367,21 +356,14 @@ class MemoryArtifactStore[DocumentAbstraction <: 
DocumentSerializer](dbName: Str
   //Use curried case class to allow equals support only for id and rev
   //This allows us to implement atomic replace and remove which check
   //for id,rev equality only
-  private case class Artifact(id: String, rev: Int)(val doc: JsObject,
-                                                    val computed: JsObject,
-                                                    val attachments: 
Map[String, Attachment] = Map.empty) {
+  private case class Artifact(id: String, rev: Int)(val doc: JsObject, val 
computed: JsObject) {
     def incrementRev(): Artifact = {
       val (newRev, updatedDoc) = incrementAndGet()
-      copy(rev = newRev)(updatedDoc, computed, Map.empty) //With Couch 
attachments are lost post update
+      copy(rev = newRev)(updatedDoc, computed) //With Couch attachments are 
lost post update
     }
 
     def docInfo = DocInfo(DocId(id), DocRevision(rev.toString))
 
-    def attach(name: String, attachment: Attachment): Artifact = {
-      val (newRev, updatedDoc) = incrementAndGet()
-      copy(rev = newRev)(updatedDoc, computed, attachments + (name -> 
attachment))
-    }
-
     private def incrementAndGet() = {
       val newRev = rev + 1
       val updatedDoc = JsObject(doc.fields + (_rev -> 
JsString(newRev.toString)))
@@ -389,8 +371,6 @@ class MemoryArtifactStore[DocumentAbstraction <: 
DocumentSerializer](dbName: Str
     }
   }
 
-  private case class Attachment(bytes: ByteString, contentType: ContentType)
-
   private object Artifact {
     def apply(id: String, rev: Int, doc: JsObject): Artifact = {
       Artifact(id, rev)(doc, documentHandler.computedFields(doc))
@@ -399,10 +379,5 @@ class MemoryArtifactStore[DocumentAbstraction <: 
DocumentSerializer](dbName: Str
     def apply(info: DocInfo): Artifact = {
       Artifact(info.id.id, info.rev.rev.toInt)(JsObject.empty, JsObject.empty)
     }
-
-    def apply(info: DocInfo, doc: JsObject, c: JsObject): Artifact = {
-      Artifact(info.id.id, info.rev.rev.toInt)(doc, c)
-    }
   }
-
 }
diff --git 
a/common/scala/src/main/scala/whisk/core/database/memory/MemoryAttachmentStore.scala
 
b/common/scala/src/main/scala/whisk/core/database/memory/MemoryAttachmentStore.scala
new file mode 100644
index 0000000..face871
--- /dev/null
+++ 
b/common/scala/src/main/scala/whisk/core/database/memory/MemoryAttachmentStore.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.memory
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.ContentType
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.{Keep, Sink, Source}
+import akka.util.{ByteString, ByteStringBuilder}
+import whisk.common.LoggingMarkers.{DATABASE_ATTS_DELETE, DATABASE_ATT_DELETE, 
DATABASE_ATT_GET, DATABASE_ATT_SAVE}
+import whisk.common.{Logging, TransactionId}
+import whisk.core.database.StoreUtils._
+import whisk.core.database._
+import whisk.core.entity.DocId
+
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.{ExecutionContext, Future}
+import scala.reflect.ClassTag
+
+object MemoryAttachmentStoreProvider extends AttachmentStoreProvider {
+  override def makeStore[D <: DocumentSerializer: ClassTag]()(implicit 
actorSystem: ActorSystem,
+                                                              logging: Logging,
+                                                              materializer: 
ActorMaterializer): AttachmentStore =
+    new 
MemoryAttachmentStore(implicitly[ClassTag[D]].runtimeClass.getSimpleName.toLowerCase)
+}
+
+/**
+ * Basic in-memory AttachmentStore implementation. Useful for testing.
+ */
+class MemoryAttachmentStore(dbName: String)(implicit system: ActorSystem,
+                                            logging: Logging,
+                                            materializer: ActorMaterializer)
+    extends AttachmentStore {
+
+  override protected[core] implicit val executionContext: ExecutionContext = 
system.dispatcher
+
+  private case class Attachment(bytes: ByteString)
+
+  private val attachments = new TrieMap[String, Attachment]
+  private var closed = false
+
+  override val scheme = "mems"
+
+  override protected[core] def attach(
+    docId: DocId,
+    name: String,
+    contentType: ContentType,
+    docStream: Source[ByteString, _])(implicit transid: TransactionId): 
Future[AttachResult] = {
+    require(name != null, "name undefined")
+    val start = transid.started(this, DATABASE_ATT_SAVE, s"[ATT_PUT] uploading 
attachment '$name' of document '$docId'")
+
+    val uploadSink = Sink.fold[ByteStringBuilder, ByteString](new 
ByteStringBuilder)((builder, b) => builder ++= b)
+
+    val f = docStream.runWith(combinedSink(uploadSink))
+
+    val g = f.map { r =>
+      attachments += (attachmentKey(docId, name) -> 
Attachment(r.uploadResult.result().compact))
+      transid
+        .finished(this, start, s"[ATT_PUT] '$dbName' completed uploading 
attachment '$name' of document '$docId'")
+      AttachResult(r.digest, r.length)
+    }
+
+    reportFailure(
+      g,
+      start,
+      failure => s"[ATT_PUT] '$dbName' internal error, name: '$name', doc: 
'$docId', failure: '${failure.getMessage}'")
+  }
+
+  /**
+   * Retrieves a saved attachment, streaming it into the provided Sink.
+   */
+  override protected[core] def readAttachment[T](docId: DocId, name: String, 
sink: Sink[ByteString, Future[T]])(
+    implicit transid: TransactionId): Future[T] = {
+
+    val start =
+      transid.started(this, DATABASE_ATT_GET, s"[ATT_GET] '$dbName' finding 
attachment '$name' of document '$docId'")
+
+    val f = attachments.get(attachmentKey(docId, name)) match {
+      case Some(Attachment(bytes)) =>
+        val r = Source.single(bytes).toMat(sink)(Keep.right).run
+        r.map(t => {
+          transid.finished(this, start, s"[ATT_GET] '$dbName' completed: found 
attachment '$name' of document '$docId'")
+          t
+        })
+      case None =>
+        transid.finished(
+          this,
+          start,
+          s"[ATT_GET] '$dbName', retrieving attachment '$name' of document 
'$docId'; not found.")
+        Future.failed(NoDocumentException("Not found on 'readAttachment'."))
+    }
+    reportFailure(
+      f,
+      start,
+      failure => s"[ATT_GET] '$dbName' internal error, name: '$name', doc: 
'$docId', failure: '${failure.getMessage}'")
+  }
+
+  override protected[core] def deleteAttachments(docId: DocId)(implicit 
transid: TransactionId): Future[Boolean] = {
+    val start = transid.started(this, DATABASE_ATTS_DELETE, s"[ATTS_DELETE] 
uploading attachment of document '$docId'")
+
+    val prefix = docId + "/"
+    attachments --= attachments.keySet.filter(_.startsWith(prefix))
+    transid.finished(this, start, s"[ATTS_DELETE] completed: delete attachment 
of document '$docId'")
+    Future.successful(true)
+  }
+
+  override protected[core] def deleteAttachment(docId: DocId, name: String)(
+    implicit transid: TransactionId): Future[Boolean] = {
+    val start = transid.started(this, DATABASE_ATT_DELETE, s"[ATT_DELETE] 
uploading attachment of document '$docId'")
+    attachments.remove(attachmentKey(docId, name))
+    transid.finished(this, start, s"[ATT_DELETE] completed: delete attachment 
of document '$docId'")
+    Future.successful(true)
+  }
+
+  def attachmentCount: Int = attachments.size
+
+  def isClosed = closed
+
+  override def shutdown(): Unit = {
+    closed = true
+  }
+
+  private def attachmentKey(docId: DocId, name: String) = s"${docId.id}/$name"
+}
diff --git a/common/scala/src/main/scala/whisk/core/entity/Attachments.scala 
b/common/scala/src/main/scala/whisk/core/entity/Attachments.scala
index 6ec9bae..2d00a03 100644
--- a/common/scala/src/main/scala/whisk/core/entity/Attachments.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/Attachments.scala
@@ -17,15 +17,13 @@
 
 package whisk.core.entity
 
-import scala.util.Try
-
 import akka.http.scaladsl.model.ContentType
-
-import spray.json._
 import spray.json.DefaultJsonProtocol._
-
+import spray.json._
 import whisk.core.entity.size._
 
+import scala.util.Try
+
 object Attachments {
 
   /**
@@ -43,7 +41,7 @@ object Attachments {
 
   case class Attached(attachmentName: String,
                       attachmentType: ContentType,
-                      length: Option[Int] = None,
+                      length: Option[Long] = None,
                       digest: Option[String] = None)
       extends Attachment[Nothing]
 
diff --git 
a/tests/src/test/scala/whisk/core/database/CouchDBArtifactStoreTests.scala 
b/tests/src/test/scala/whisk/core/database/CouchDBArtifactStoreTests.scala
index a36fd30..159ac5b 100644
--- a/tests/src/test/scala/whisk/core/database/CouchDBArtifactStoreTests.scala
+++ b/tests/src/test/scala/whisk/core/database/CouchDBArtifactStoreTests.scala
@@ -21,30 +21,6 @@ import org.junit.runner.RunWith
 import org.scalatest.FlatSpec
 import org.scalatest.junit.JUnitRunner
 import whisk.core.database.test.behavior.ArtifactStoreBehavior
-import whisk.core.entity._
-
-import scala.reflect.classTag
 
 @RunWith(classOf[JUnitRunner])
-class CouchDBArtifactStoreTests extends FlatSpec with ArtifactStoreBehavior {
-  override def storeType = "CouchDB"
-
-  override val authStore = {
-    implicit val docReader: DocumentReader = WhiskDocumentReader
-    CouchDbStoreProvider.makeStore[WhiskAuth]()
-  }
-
-  override val entityStore =
-    CouchDbStoreProvider.makeStore[WhiskEntity]()(
-      classTag[WhiskEntity],
-      WhiskEntityJsonFormat,
-      WhiskDocumentReader,
-      actorSystem,
-      logging,
-      materializer)
-
-  override val activationStore = {
-    implicit val docReader: DocumentReader = WhiskDocumentReader
-    CouchDbStoreProvider.makeStore[WhiskActivation]()
-  }
-}
+class CouchDBArtifactStoreTests extends FlatSpec with CouchDBStoreBehaviorBase 
with ArtifactStoreBehavior {}
diff --git 
a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala 
b/tests/src/test/scala/whisk/core/database/CouchDBAttachmentStoreTests.scala
similarity index 60%
copy from 
common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala
copy to 
tests/src/test/scala/whisk/core/database/CouchDBAttachmentStoreTests.scala
index fdcc306..e0d7986 100644
--- 
a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala
+++ b/tests/src/test/scala/whisk/core/database/CouchDBAttachmentStoreTests.scala
@@ -17,23 +17,16 @@
 
 package whisk.core.database
 
-import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
-import spray.json.RootJsonFormat
-import whisk.common.Logging
-import whisk.spi.Spi
-import whisk.core.entity.DocumentReader
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.junit.JUnitRunner
+import whisk.core.database.memory.MemoryAttachmentStoreProvider
+import whisk.core.database.test.behavior.ArtifactStoreAttachmentBehaviors
 
 import scala.reflect.ClassTag
 
-/**
- * An Spi for providing ArtifactStore implementations
- */
-trait ArtifactStoreProvider extends Spi {
-  def makeStore[D <: DocumentSerializer: ClassTag](useBatching: Boolean = 
false)(
-    implicit jsonFormat: RootJsonFormat[D],
-    docReader: DocumentReader,
-    actorSystem: ActorSystem,
-    logging: Logging,
-    materializer: ActorMaterializer): ArtifactStore[D]
+@RunWith(classOf[JUnitRunner])
+class CouchDBAttachmentStoreTests extends FlatSpec with 
CouchDBStoreBehaviorBase with ArtifactStoreAttachmentBehaviors {
+  override protected def getAttachmentStore[D <: DocumentSerializer: 
ClassTag]() =
+    Some(MemoryAttachmentStoreProvider.makeStore[D]())
 }
diff --git 
a/tests/src/test/scala/whisk/core/database/CouchDBArtifactStoreTests.scala 
b/tests/src/test/scala/whisk/core/database/CouchDBStoreBehaviorBase.scala
similarity index 58%
copy from 
tests/src/test/scala/whisk/core/database/CouchDBArtifactStoreTests.scala
copy to tests/src/test/scala/whisk/core/database/CouchDBStoreBehaviorBase.scala
index a36fd30..0a13075 100644
--- a/tests/src/test/scala/whisk/core/database/CouchDBArtifactStoreTests.scala
+++ b/tests/src/test/scala/whisk/core/database/CouchDBStoreBehaviorBase.scala
@@ -17,25 +17,29 @@
 
 package whisk.core.database
 
-import org.junit.runner.RunWith
 import org.scalatest.FlatSpec
-import org.scalatest.junit.JUnitRunner
-import whisk.core.database.test.behavior.ArtifactStoreBehavior
-import whisk.core.entity._
+import whisk.core.database.test.behavior.ArtifactStoreBehaviorBase
+import whisk.core.entity.{
+  DocumentReader,
+  WhiskActivation,
+  WhiskAuth,
+  WhiskDocumentReader,
+  WhiskEntity,
+  WhiskEntityJsonFormat
+}
 
-import scala.reflect.classTag
+import scala.reflect.{classTag, ClassTag}
 
-@RunWith(classOf[JUnitRunner])
-class CouchDBArtifactStoreTests extends FlatSpec with ArtifactStoreBehavior {
+trait CouchDBStoreBehaviorBase extends FlatSpec with ArtifactStoreBehaviorBase 
{
   override def storeType = "CouchDB"
 
   override val authStore = {
     implicit val docReader: DocumentReader = WhiskDocumentReader
-    CouchDbStoreProvider.makeStore[WhiskAuth]()
+    CouchDbStoreProvider.makeArtifactStore[WhiskAuth](useBatching = false, 
getAttachmentStore[WhiskAuth]())
   }
 
   override val entityStore =
-    CouchDbStoreProvider.makeStore[WhiskEntity]()(
+    CouchDbStoreProvider.makeArtifactStore[WhiskEntity](useBatching = false, 
getAttachmentStore[WhiskEntity]())(
       classTag[WhiskEntity],
       WhiskEntityJsonFormat,
       WhiskDocumentReader,
@@ -45,6 +49,11 @@ class CouchDBArtifactStoreTests extends FlatSpec with 
ArtifactStoreBehavior {
 
   override val activationStore = {
     implicit val docReader: DocumentReader = WhiskDocumentReader
-    CouchDbStoreProvider.makeStore[WhiskActivation]()
+    CouchDbStoreProvider.makeArtifactStore[WhiskActivation](useBatching = 
true, getAttachmentStore[WhiskActivation]())
   }
+
+  override protected def getAttachmentStore(store: ArtifactStore[_]) =
+    store.asInstanceOf[CouchDbRestStore[_]].attachmentStore
+
+  protected def getAttachmentStore[D <: DocumentSerializer: ClassTag](): 
Option[AttachmentStore] = None
 }
diff --git 
a/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala
 
b/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala
index 9d4643f..c47daaa 100644
--- 
a/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala
+++ 
b/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala
@@ -20,6 +20,7 @@ package whisk.core.database.memory
 import org.junit.runner.RunWith
 import org.scalatest.FlatSpec
 import org.scalatest.junit.JUnitRunner
+import whisk.core.database.ArtifactStore
 import whisk.core.database.test.behavior.ArtifactStoreBehavior
 import whisk.core.entity._
 
@@ -47,4 +48,7 @@ class MemoryArtifactStoreTests extends FlatSpec with 
ArtifactStoreBehavior {
     implicit val docReader: DocumentReader = WhiskDocumentReader
     MemoryArtifactStoreProvider.makeStore[WhiskActivation]()
   }
+
+  override protected def getAttachmentStore(store: ArtifactStore[_]) =
+    Some(store.asInstanceOf[MemoryArtifactStore[_]].attachmentStore)
 }
diff --git 
a/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala
 
b/tests/src/test/scala/whisk/core/database/memory/MemoryAttachmentStoreTests.scala
similarity index 55%
copy from 
tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala
copy to 
tests/src/test/scala/whisk/core/database/memory/MemoryAttachmentStoreTests.scala
index 9d4643f..9d845bc 100644
--- 
a/tests/src/test/scala/whisk/core/database/memory/MemoryArtifactStoreTests.scala
+++ 
b/tests/src/test/scala/whisk/core/database/memory/MemoryAttachmentStoreTests.scala
@@ -17,34 +17,24 @@
 
 package whisk.core.database.memory
 
+import common.WskActorSystem
 import org.junit.runner.RunWith
 import org.scalatest.FlatSpec
 import org.scalatest.junit.JUnitRunner
-import whisk.core.database.test.behavior.ArtifactStoreBehavior
-import whisk.core.entity._
-
-import scala.reflect.classTag
+import whisk.core.database.AttachmentStore
+import whisk.core.database.test.AttachmentStoreBehaviors
+import whisk.core.entity.WhiskEntity
 
 @RunWith(classOf[JUnitRunner])
-class MemoryArtifactStoreTests extends FlatSpec with ArtifactStoreBehavior {
-  override def storeType = "Memory"
+class MemoryAttachmentStoreTests extends FlatSpec with 
AttachmentStoreBehaviors with WskActorSystem {
 
-  override val authStore = {
-    implicit val docReader: DocumentReader = WhiskDocumentReader
-    MemoryArtifactStoreProvider.makeStore[WhiskAuth]()
-  }
+  override val store: AttachmentStore = 
MemoryAttachmentStoreProvider.makeStore[WhiskEntity]()
 
-  override val entityStore =
-    MemoryArtifactStoreProvider.makeStore[WhiskEntity]()(
-      classTag[WhiskEntity],
-      WhiskEntityJsonFormat,
-      WhiskDocumentReader,
-      actorSystem,
-      logging,
-      materializer)
+  override def storeType: String = "Memory"
 
-  override val activationStore = {
-    implicit val docReader: DocumentReader = WhiskDocumentReader
-    MemoryArtifactStoreProvider.makeStore[WhiskActivation]()
+  override def afterAll(): Unit = {
+    super.afterAll()
+    val count = store.asInstanceOf[MemoryAttachmentStore].attachmentCount
+    require(count == 0, s"AttachmentStore not empty after all runs - $count")
   }
 }
diff --git 
a/tests/src/test/scala/whisk/core/database/test/AttachmentCompatibilityTests.scala
 
b/tests/src/test/scala/whisk/core/database/test/AttachmentCompatibilityTests.scala
index 6550cc0..dc8969f 100644
--- 
a/tests/src/test/scala/whisk/core/database/test/AttachmentCompatibilityTests.scala
+++ 
b/tests/src/test/scala/whisk/core/database/test/AttachmentCompatibilityTests.scala
@@ -33,7 +33,8 @@ import pureconfig.loadConfigOrThrow
 import spray.json.DefaultJsonProtocol
 import whisk.common.TransactionId
 import whisk.core.ConfigKeys
-import whisk.core.database.{CouchDbConfig, CouchDbRestClient, 
NoDocumentException}
+import whisk.core.database.memory.MemoryAttachmentStoreProvider
+import whisk.core.database.{CouchDbConfig, CouchDbRestClient, 
CouchDbStoreProvider, NoDocumentException}
 import whisk.core.entity.Attachments.Inline
 import whisk.core.entity.test.ExecHelpers
 import whisk.core.entity.{
@@ -42,11 +43,14 @@ import whisk.core.entity.{
   EntityName,
   EntityPath,
   WhiskAction,
+  WhiskDocumentReader,
   WhiskEntity,
+  WhiskEntityJsonFormat,
   WhiskEntityStore
 }
 
 import scala.concurrent.Future
+import scala.reflect.classTag
 
 @RunWith(classOf[JUnitRunner])
 class AttachmentCompatibilityTests
@@ -93,6 +97,28 @@ class AttachmentCompatibilityTests
     val doc =
       WhiskAction(namespace, EntityName("attachment_unique"), exec)
 
+    createAction(doc)
+
+    val doc2 = WhiskAction.get(entityStore, doc.docid).futureValue
+    doc2.exec shouldBe exec
+  }
+
+  it should "read attachments created using old scheme with AttachmentStore" 
in {
+    implicit val tid: TransactionId = transid()
+    val namespace = EntityPath("attachment-compat-test2")
+    val exec = javaDefault("ZHViZWU=", Some("hello"))
+    val doc =
+      WhiskAction(namespace, EntityName("attachment_unique"), exec)
+
+    createAction(doc)
+
+    val entityStore2 = createEntityStore()
+    val doc2 = WhiskAction.get(entityStore2, doc.docid).futureValue
+    doc2.exec shouldBe exec
+  }
+
+  private def createAction(doc: WhiskAction) = {
+    implicit val tid: TransactionId = transid()
     doc.exec match {
       case exec @ CodeExecAsAttachment(_, Inline(code), _) =>
         val attached = exec.manifest.attached.get
@@ -109,9 +135,6 @@ class AttachmentCompatibilityTests
       case _ =>
         fail("Exec must be code attachment")
     }
-
-    val doc2 = WhiskAction.get(entityStore, doc.docid).futureValue
-    doc2.exec shouldBe exec
   }
 
   private def attach(doc: DocInfo,
@@ -131,4 +154,17 @@ class AttachmentCompatibilityTests
         throw new Exception("Unexpected http response code: " + code)
     }
   }
+
+  private def createEntityStore() =
+    CouchDbStoreProvider
+      .makeArtifactStore[WhiskEntity](
+        useBatching = false,
+        Some(MemoryAttachmentStoreProvider.makeStore[WhiskEntity]()))(
+        classTag[WhiskEntity],
+        WhiskEntityJsonFormat,
+        WhiskDocumentReader,
+        actorSystem,
+        logging,
+        materializer)
+
 }
diff --git 
a/tests/src/test/scala/whisk/core/database/test/AttachmentStoreBehaviors.scala 
b/tests/src/test/scala/whisk/core/database/test/AttachmentStoreBehaviors.scala
new file mode 100644
index 0000000..0b5a879
--- /dev/null
+++ 
b/tests/src/test/scala/whisk/core/database/test/AttachmentStoreBehaviors.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database.test
+
+import java.io.ByteArrayInputStream
+
+import akka.http.scaladsl.model.ContentTypes
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.{Sink, Source, StreamConverters}
+import akka.util.{ByteString, ByteStringBuilder}
+import common.{StreamLogging, WskActorSystem}
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+import whisk.common.TransactionId
+import whisk.core.database.{AttachmentStore, NoDocumentException}
+import whisk.core.entity.DocId
+
+import scala.collection.mutable.ListBuffer
+import scala.concurrent.Await
+import scala.concurrent.duration.DurationInt
+import scala.util.Random
+
+trait AttachmentStoreBehaviors
+    extends ScalaFutures
+    with DbUtils
+    with Matchers
+    with StreamLogging
+    with WskActorSystem
+    with BeforeAndAfterAll {
+  this: FlatSpec =>
+
+  //Bring in sync the timeout used by ScalaFutures and DBUtils
+  implicit override val patienceConfig: PatienceConfig = 
PatienceConfig(timeout = dbOpTimeout)
+
+  protected implicit val materializer: ActorMaterializer = ActorMaterializer()
+
+  protected val prefix = 
s"attachmentTCK_${Random.alphanumeric.take(4).mkString}"
+
+  private val attachmentsToDelete = ListBuffer[String]()
+
+  def store: AttachmentStore
+
+  def storeType: String
+
+  def garbageCollectAttachments: Boolean = true
+
+  behavior of s"$storeType AttachmentStore"
+
+  it should "add and read attachment" in {
+    implicit val tid: TransactionId = transid()
+    val bytes = randomBytes(16023)
+
+    val docId = newDocId()
+    val result = store.attach(docId, "code", 
ContentTypes.`application/octet-stream`, chunkedSource(bytes)).futureValue
+
+    result.length shouldBe 16023
+
+    val byteBuilder = store.readAttachment(docId, "code", 
byteStringSink()).futureValue
+
+    byteBuilder.result() shouldBe ByteString(bytes)
+    garbageCollect(docId)
+  }
+
+  it should "add and delete attachments" in {
+    implicit val tid: TransactionId = transid()
+    val b1 = randomBytes(1000)
+    val b2 = randomBytes(2000)
+    val b3 = randomBytes(3000)
+
+    val docId = newDocId()
+    val r1 = store.attach(docId, "c1", 
ContentTypes.`application/octet-stream`, chunkedSource(b1)).futureValue
+    val r2 = store.attach(docId, "c2", ContentTypes.`application/json`, 
chunkedSource(b2)).futureValue
+    val r3 = store.attach(docId, "c3", ContentTypes.`application/json`, 
chunkedSource(b3)).futureValue
+
+    r1.length shouldBe 1000
+    r2.length shouldBe 2000
+    r3.length shouldBe 3000
+
+    attachmentBytes(docId, "c1").futureValue.result() shouldBe ByteString(b1)
+    attachmentBytes(docId, "c2").futureValue.result() shouldBe ByteString(b2)
+    attachmentBytes(docId, "c3").futureValue.result() shouldBe ByteString(b3)
+
+    //Delete single attachment
+    store.deleteAttachment(docId, "c1").futureValue shouldBe true
+
+    //Non deleted attachments related to same docId must still be accessible
+    attachmentBytes(docId, "c1").failed.futureValue shouldBe 
a[NoDocumentException]
+    attachmentBytes(docId, "c2").futureValue.result() shouldBe ByteString(b2)
+    attachmentBytes(docId, "c3").futureValue.result() shouldBe ByteString(b3)
+
+    //Delete all attachments
+    store.deleteAttachments(docId).futureValue shouldBe true
+
+    attachmentBytes(docId, "c2").failed.futureValue shouldBe 
a[NoDocumentException]
+    attachmentBytes(docId, "c3").failed.futureValue shouldBe 
a[NoDocumentException]
+  }
+
+  it should "throw NoDocumentException on reading non existing attachment" in {
+    implicit val tid: TransactionId = transid()
+
+    val docId = DocId("no-existing-id")
+    val f = store.readAttachment(docId, "code", byteStringSink())
+
+    f.failed.futureValue shouldBe a[NoDocumentException]
+  }
+
+  it should "not write an attachment when there is error in Source" in {
+    implicit val tid: TransactionId = transid()
+
+    val docId = newDocId()
+    val error = new Error("boom!")
+    val faultySource = Source(1 to 10)
+      .map { n ⇒
+        if (n == 7) throw error
+        n
+      }
+      .map(ByteString(_))
+    val writeResult = store.attach(docId, "code", 
ContentTypes.`application/octet-stream`, faultySource)
+    writeResult.failed.futureValue.getCause should be theSameInstanceAs error
+
+    val readResult = store.readAttachment(docId, "code", byteStringSink())
+    readResult.failed.futureValue shouldBe a[NoDocumentException]
+  }
+
+  override def afterAll(): Unit = {
+    if (garbageCollectAttachments) {
+      implicit val tid: TransactionId = transid()
+      val f =
+        Source(attachmentsToDelete.toList)
+          .mapAsync(2)(id => store.deleteAttachments(DocId(id)))
+          .runWith(Sink.ignore)
+      Await.result(f, 1.minute)
+    }
+    super.afterAll()
+  }
+
+  protected def garbageCollect(docId: DocId): Unit = {}
+
+  protected def newDocId(): DocId = {
+    //By default create an info with dummy revision
+    //as apart from CouchDB other stores do not support the revision property
+    //for blobs
+    counter = counter + 1
+    val docId = s"${prefix}_$counter"
+    attachmentsToDelete += docId
+    DocId(docId)
+  }
+
+  @volatile var counter = 0
+
+  private def attachmentBytes(id: DocId, name: String) = {
+    implicit val tid: TransactionId = transid()
+    store.readAttachment(id, name, byteStringSink())
+  }
+
+  private def chunkedSource(bytes: Array[Byte]): Source[ByteString, _] = {
+    StreamConverters.fromInputStream(() => new ByteArrayInputStream(bytes), 42)
+  }
+
+  private def byteStringSink() = {
+    Sink.fold[ByteStringBuilder, ByteString](new ByteStringBuilder)((builder, 
b) => builder ++= b)
+  }
+}
diff --git a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala 
b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
index 39ddad3..2b39fd3 100644
--- a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
+++ b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
@@ -21,26 +21,23 @@ import java.util.Base64
 import java.util.concurrent.TimeoutException
 import java.util.concurrent.atomic.AtomicInteger
 
-import scala.collection.mutable.ListBuffer
-import scala.concurrent.Await
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.concurrent.duration.Duration
-import scala.concurrent.duration.DurationInt
-import scala.language.postfixOps
-import scala.util.{Failure, Random, Success, Try}
-import spray.json._
+import akka.http.scaladsl.model.ContentType
+import akka.stream.scaladsl.Source
+import akka.util.ByteString
 import spray.json.DefaultJsonProtocol._
+import spray.json._
 import whisk.common.TransactionId
 import whisk.core.database._
 import whisk.core.database.memory.MemoryArtifactStore
-import whisk.core.entity._
-import whisk.core.entity.types.AuthStore
-import whisk.core.entity.types.EntityStore
-import akka.http.scaladsl.model.ContentType
-import akka.stream.scaladsl.Source
-import akka.util.ByteString
 import whisk.core.entity.Attachments.Attached
+import whisk.core.entity._
+import whisk.core.entity.types.{AuthStore, EntityStore}
+
+import scala.collection.mutable.ListBuffer
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.duration.{Duration, DurationInt}
+import scala.language.postfixOps
+import scala.util.{Failure, Random, Success, Try}
 
 /**
  * WARNING: the put/get/del operations in this trait operate directly on the 
datastore,
@@ -278,7 +275,10 @@ trait DbUtils {
    */
   def cleanup()(implicit timeout: Duration = 10 seconds) = {
     docsToDelete.map { e =>
-      Try(Await.result(e._1.del(e._2)(TransactionId.testing), timeout))
+      Try {
+        Await.result(e._1.del(e._2)(TransactionId.testing), timeout)
+        Await.result(e._1.deleteAttachments(e._2)(TransactionId.testing), 
timeout)
+      }
     }
     docsToDelete.clear()
   }
@@ -321,7 +321,7 @@ trait DbUtils {
   def isMemoryStore(store: ArtifactStore[_]): Boolean = 
store.isInstanceOf[MemoryArtifactStore[_]]
   def isCouchStore(store: ArtifactStore[_]): Boolean = 
store.isInstanceOf[CouchDbRestStore[_]]
 
-  private def randomBytes(size: Int): Array[Byte] = {
+  protected def randomBytes(size: Int): Array[Byte] = {
     val arr = new Array[Byte](size)
     Random.nextBytes(arr)
     arr
diff --git 
a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
 
b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
index 18083c3..de7a7a8 100644
--- 
a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
+++ 
b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
@@ -18,10 +18,12 @@
 package whisk.core.database.test.behavior
 
 import java.io.ByteArrayOutputStream
+import java.util.Base64
 
 import akka.http.scaladsl.model.{ContentTypes, Uri}
 import akka.stream.IOResult
-import akka.stream.scaladsl.StreamConverters
+import akka.stream.scaladsl.{Sink, StreamConverters}
+import akka.util.{ByteString, ByteStringBuilder}
 import whisk.common.TransactionId
 import whisk.core.database.{AttachmentInliner, CacheChangeNotification, 
NoDocumentException}
 import whisk.core.entity.Attachments.{Attached, Attachment, Inline}
@@ -29,7 +31,7 @@ import whisk.core.entity.test.ExecHelpers
 import whisk.core.entity.{CodeExec, DocInfo, EntityName, ExecManifest, 
WhiskAction}
 
 trait ArtifactStoreAttachmentBehaviors extends ArtifactStoreBehaviorBase with 
ExecHelpers {
-  behavior of "Attachments"
+  behavior of s"${storeType}ArtifactStore attachments"
 
   private val namespace = newNS()
   private val attachmentHandler = Some(WhiskAction.attachmentHandler _)
@@ -58,6 +60,57 @@ trait ArtifactStoreAttachmentBehaviors extends 
ArtifactStoreBehaviorBase with Ex
     attachmentUri.isAbsolute shouldBe true
   }
 
+  /**
+   * This test asserts that old attachments are deleted and cannot be read 
again
+   */
+  it should "fail on reading with old non inlined attachment" in {
+    implicit val tid: TransactionId = transid()
+    val code1 = nonInlinedCode(entityStore)
+    val exec = javaDefault(code1, Some("hello"))
+    val javaAction =
+      WhiskAction(namespace, EntityName("attachment_update_2"), exec)
+
+    val i1 = WhiskAction.put(entityStore, javaAction, old = None).futureValue
+
+    val action2 = entityStore.get[WhiskAction](i1, 
attachmentHandler).futureValue
+    val code2 = nonInlinedCode(entityStore)
+    val exec2 = javaDefault(code2, Some("hello"))
+    val action2Updated = action2.copy(exec = 
exec2).revision[WhiskAction](i1.rev)
+
+    val i2 = WhiskAction.put(entityStore, action2Updated, old = 
Some(action2)).futureValue
+    val action3 = entityStore.get[WhiskAction](i2, 
attachmentHandler).futureValue
+
+    docsToDelete += ((entityStore, i2))
+    getAttachmentBytes(i2, attached(action3)).futureValue.result() shouldBe 
decode(code2)
+    getAttachmentBytes(i1, attached(action2)).failed.futureValue shouldBe 
a[NoDocumentException]
+  }
+
+  /**
+   * Variant of previous test where read with old attachment should still work
+   * if attachment is inlined
+   */
+  it should "work on reading with old inlined attachment" in {
+    implicit val tid: TransactionId = transid()
+    val code1 = encodedRandomBytes(inlinedAttachmentSize(entityStore))
+    val exec = javaDefault(code1, Some("hello"))
+    val javaAction =
+      WhiskAction(namespace, EntityName("attachment_update_2"), exec)
+
+    val i1 = WhiskAction.put(entityStore, javaAction, old = None).futureValue
+
+    val action2 = entityStore.get[WhiskAction](i1, 
attachmentHandler).futureValue
+    val code2 = nonInlinedCode(entityStore)
+    val exec2 = javaDefault(code2, Some("hello"))
+    val action2Updated = action2.copy(exec = 
exec2).revision[WhiskAction](i1.rev)
+
+    val i2 = WhiskAction.put(entityStore, action2Updated, old = 
Some(action2)).futureValue
+    val action3 = entityStore.get[WhiskAction](i2, 
attachmentHandler).futureValue
+
+    docsToDelete += ((entityStore, i2))
+    getAttachmentBytes(i2, attached(action3)).futureValue.result() shouldBe 
decode(code2)
+    getAttachmentBytes(i2, attached(action2)).futureValue.result() shouldBe 
decode(code1)
+  }
+
   it should "put and read same attachment" in {
     implicit val tid: TransactionId = transid()
     val size = nonInlinedAttachmentSize(entityStore)
@@ -124,9 +177,44 @@ trait ArtifactStoreAttachmentBehaviors extends 
ArtifactStoreBehaviorBase with Ex
       .futureValue shouldBe a[NoDocumentException]
   }
 
+  it should "delete attachment on document delete" in {
+    val attachmentStore = getAttachmentStore(entityStore)
+    assume(attachmentStore.isDefined, "ArtifactStore does not have 
attachmentStore configured")
+
+    implicit val tid: TransactionId = transid()
+    val size = nonInlinedAttachmentSize(entityStore)
+    val base64 = encodedRandomBytes(size)
+
+    val exec = javaDefault(base64, Some("hello"))
+    val javaAction =
+      WhiskAction(namespace, EntityName("attachment_unique"), exec)
+
+    val i1 = WhiskAction.put(entityStore, javaAction, old = None).futureValue
+    val action2 = entityStore.get[WhiskAction](i1, 
attachmentHandler).futureValue
+
+    WhiskAction.del(entityStore, i1).futureValue shouldBe true
+
+    val attachmentName = Uri(attached(action2).attachmentName).path.toString
+    attachmentStore.get
+      .readAttachment(i1.id, attachmentName, byteStringSink())
+      .failed
+      .futureValue shouldBe a[NoDocumentException]
+  }
+
   private def attached(a: WhiskAction): Attached =
     
a.exec.asInstanceOf[CodeExec[Attachment[Nothing]]].code.asInstanceOf[Attached]
 
   private def inlined(a: WhiskAction): Inline[String] =
     
a.exec.asInstanceOf[CodeExec[Attachment[String]]].code.asInstanceOf[Inline[String]]
+
+  private def getAttachmentBytes(docInfo: DocInfo, attached: Attached) = {
+    implicit val tid: TransactionId = transid()
+    entityStore.readAttachment(docInfo, attached, byteStringSink())
+  }
+
+  private def byteStringSink() = {
+    Sink.fold[ByteStringBuilder, ByteString](new ByteStringBuilder)((builder, 
b) => builder ++= b)
+  }
+
+  private def decode(s: String): ByteString = 
ByteString(Base64.getDecoder.decode(s))
 }
diff --git 
a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreBehaviorBase.scala
 
b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreBehaviorBase.scala
index e22063d..239bbfc 100644
--- 
a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreBehaviorBase.scala
+++ 
b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreBehaviorBase.scala
@@ -25,8 +25,9 @@ import org.scalatest.concurrent.{IntegrationPatience, 
ScalaFutures}
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec, 
Matchers}
 import spray.json.{JsObject, JsValue}
 import whisk.common.TransactionId
+import whisk.core.database.memory.MemoryAttachmentStore
 import whisk.core.database.test.DbUtils
-import whisk.core.database.{ArtifactStore, StaleParameter}
+import whisk.core.database.{ArtifactStore, AttachmentStore, StaleParameter}
 import whisk.core.entity._
 import whisk.utils.JsHelpers
 
@@ -61,11 +62,13 @@ trait ArtifactStoreBehaviorBase
   }
 
   override def afterAll(): Unit = {
+    assertAttachmentStoreIsEmpty()
     println("Shutting down store connections")
     authStore.shutdown()
     entityStore.shutdown()
     activationStore.shutdown()
     super.afterAll()
+    assertAttachmentStoresAreClosed()
   }
 
   //~----------------------------------------< utility methods >
@@ -137,4 +140,30 @@ trait ArtifactStoreBehaviorBase
   protected def getJsField(js: JsObject, subObject: String, fieldName: 
String): JsValue = {
     js.fields(subObject).asJsObject().fields(fieldName)
   }
+
+  protected def getAttachmentStore(store: ArtifactStore[_]): 
Option[AttachmentStore]
+
+  protected def getAttachmentCount(store: AttachmentStore): Option[Int] = 
store match {
+    case s: MemoryAttachmentStore => Some(s.attachmentCount)
+    case _                        => None
+  }
+
+  private def assertAttachmentStoreIsEmpty(): Unit = {
+    Seq(authStore, entityStore, activationStore).foreach { s =>
+      for {
+        as <- getAttachmentStore(s)
+        count <- getAttachmentCount(as)
+      } require(count == 0, s"AttachmentStore not empty after all runs - 
$count")
+    }
+  }
+
+  private def assertAttachmentStoresAreClosed(): Unit = {
+    Seq(authStore, entityStore, activationStore).foreach { s =>
+      getAttachmentStore(s).foreach {
+        case s: MemoryAttachmentStore => require(s.isClosed, "AttachmentStore 
was not closed")
+        case _                        =>
+      }
+    }
+  }
+
 }

-- 
To stop receiving notification emails like this one, please contact
chet...@apache.org.

Reply via email to