This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f43f5d  Avoid converting ByteString to bytes in attachment inlining 
flow (#3777)
3f43f5d is described below

commit 3f43f5d85de56caf9f527e3e7a9b15a4ec46ea70
Author: Chetan Mehrotra <[email protected]>
AuthorDate: Thu Jun 21 06:34:43 2018 +0530

    Avoid converting ByteString to bytes in attachment inlining flow (#3777)
    
    Re-enables attachment inlining by "prefixAndTail" 1 ByteString at a time 
instead of flattening the source stream to byte.
    
    In addition logic to attach the attachment to external AttachmentStore is 
refactored to "AttachmentSupport" and reused across ArtifactSTore 
implementations.
---
 common/scala/src/main/resources/application.conf   |   6 +-
 .../whisk/core/database/AttachmentInliner.scala    | 117 -------------
 .../whisk/core/database/AttachmentSupport.scala    | 190 +++++++++++++++++++++
 .../whisk/core/database/CouchDbRestStore.scala     |  71 ++------
 .../core/database/memory/MemoryArtifactStore.scala |  35 +---
 .../database/memory/MemoryAttachmentStore.scala    |  11 +-
 ...nerTests.scala => AttachmentSupportTests.scala} |  28 ++-
 .../scala/whisk/core/database/test/DbUtils.scala   |   8 +-
 .../ArtifactStoreAttachmentBehaviors.scala         |  11 +-
 9 files changed, 237 insertions(+), 240 deletions(-)

diff --git a/common/scala/src/main/resources/application.conf 
b/common/scala/src/main/resources/application.conf
index 6aa1bbd..3ceaab8 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -117,11 +117,7 @@ whisk {
 
         # Size limit for inlined attachments. Attachments having size less 
than this would
         # be inlined with there content encoded in attachmentName
-        max-inline-size = 0 k
-
-        # Chunk sized for converting source of bytes to ByteString as part of 
attachment
-        # upload flow
-        chunk-size = 8 k
+        max-inline-size = 16 k
     }
 
     # CouchDB related configuration
diff --git 
a/common/scala/src/main/scala/whisk/core/database/AttachmentInliner.scala 
b/common/scala/src/main/scala/whisk/core/database/AttachmentInliner.scala
deleted file mode 100644
index 2369b1b..0000000
--- a/common/scala/src/main/scala/whisk/core/database/AttachmentInliner.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.database
-
-import java.util.Base64
-
-import akka.NotUsed
-import akka.http.scaladsl.model.Uri
-import akka.stream.Materializer
-import akka.stream.scaladsl.{Concat, Sink, Source}
-import akka.util.{ByteString, ByteStringBuilder}
-import whisk.core.database.AttachmentInliner.MemScheme
-import whisk.core.entity.ByteSize
-
-import scala.collection.immutable
-import scala.concurrent.Future
-
-object AttachmentInliner {
-
-  /**
-   * Scheme name for attachments which are inlined
-   */
-  val MemScheme: String = "mem"
-}
-
-case class InliningConfig(maxInlineSize: ByteSize, chunkSize: ByteSize)
-
-/**
- * Provides support for inlining small attachments. Inlined attachment 
contents are encoded as part of attachment
- * name itself.
- */
-trait AttachmentInliner {
-
-  /** Materializer required for stream processing */
-  protected[core] implicit val materializer: Materializer
-
-  protected[database] def inlineAndTail(
-    docStream: Source[ByteString, _]): Future[(immutable.Seq[Byte], 
Source[Byte, _])] = {
-    docStream
-      .mapConcat(_.seq)
-      .prefixAndTail(maxInlineSize.toBytes.toInt)
-      .runWith(Sink.head[(immutable.Seq[Byte], Source[Byte, _])])
-  }
-
-  protected[database] def uriOf(bytes: Seq[Byte], path: => String): Uri = {
-    //For less than case its definitive that tail source would be empty
-    //for equal case it cannot be determined if tail source is empty. Actual 
max inline size
-    //would be inlineSize - 1
-    if (bytes.size < maxInlineSize.toBytes) {
-      Uri.from(scheme = MemScheme, path = encode(bytes))
-    } else {
-      Uri.from(scheme = attachmentScheme, path = path)
-    }
-  }
-
-  /**
-   * Constructs a combined source based on attachment content read so far and 
rest of unread content.
-   * Emitted elements are up to `chunkSize` sized [[akka.util.ByteString]] 
elements.
-   */
-  protected[database] def combinedSource(inlinedBytes: immutable.Seq[Byte],
-                                         tailSource: Source[Byte, _]): 
Source[ByteString, NotUsed] =
-    Source
-      .combine(Source(inlinedBytes), tailSource)(Concat[Byte])
-      .batch[ByteStringBuilder](chunkSize.toBytes, b => { val bb = new 
ByteStringBuilder(); bb += b })((bb, b) =>
-        bb += b)
-      .map(_.result())
-
-  /**
-   * Constructs a source from inlined attachment contents
-   */
-  protected[database] def memorySource(uri: Uri): Source[ByteString, NotUsed] 
= {
-    require(uri.scheme == MemScheme, s"URI $uri scheme is not $MemScheme")
-    Source.single(ByteString(decode(uri)))
-  }
-
-  protected[database] def isInlined(uri: Uri): Boolean = uri.scheme == 
MemScheme
-
-  protected[database] def digest(bytes: TraversableOnce[Byte]): String = {
-    val digester = StoreUtils.emptyDigest()
-    digester.update(bytes.toArray)
-    StoreUtils.encodeDigest(digester.digest())
-  }
-
-  /**
-   * Attachments having size less than this would be inlined
-   */
-  def maxInlineSize: ByteSize = inliningConfig.maxInlineSize
-
-  def chunkSize: ByteSize = inliningConfig.chunkSize
-
-  protected def inliningConfig: InliningConfig
-
-  protected def attachmentScheme: String
-
-  private def encode(bytes: Seq[Byte]): String = {
-    Base64.getUrlEncoder.encodeToString(bytes.toArray)
-  }
-
-  private def decode(uri: Uri): Array[Byte] = {
-    Base64.getUrlDecoder.decode(uri.path.toString())
-  }
-}
diff --git 
a/common/scala/src/main/scala/whisk/core/database/AttachmentSupport.scala 
b/common/scala/src/main/scala/whisk/core/database/AttachmentSupport.scala
new file mode 100644
index 0000000..72f7754
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/database/AttachmentSupport.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database
+
+import java.util.Base64
+
+import akka.NotUsed
+import akka.http.scaladsl.model.{ContentType, Uri}
+import akka.stream.Materializer
+import akka.stream.scaladsl.{Sink, Source}
+import akka.util.ByteString
+import spray.json.DefaultJsonProtocol
+import whisk.common.TransactionId
+import whisk.core.database.AttachmentSupport.MemScheme
+import whisk.core.entity.Attachments.Attached
+import whisk.core.entity.{ByteSize, DocId, DocInfo, UUID}
+
+import scala.concurrent.{ExecutionContext, Future}
+
+object AttachmentSupport {
+
+  /**
+   * Scheme name for attachments which are inlined
+   */
+  val MemScheme: String = "mem"
+}
+
+case class InliningConfig(maxInlineSize: ByteSize)
+
+/**
+ * Provides support for inlining small attachments. Inlined attachment 
contents are encoded as part of attachment
+ * name itself.
+ */
+trait AttachmentSupport[DocumentAbstraction <: DocumentSerializer] extends 
DefaultJsonProtocol {
+
+  /** Materializer required for stream processing */
+  protected[core] implicit val materializer: Materializer
+
+  protected def executionContext: ExecutionContext
+
+  /**
+   * Attachment scheme name to use for non inlined attachments
+   */
+  protected def attachmentScheme: String
+
+  protected def inliningConfig: InliningConfig
+
+  /**
+   * Attachments having size less than this would be inlined
+   */
+  def maxInlineSize: ByteSize = inliningConfig.maxInlineSize
+
+  /**
+   * See {{ ArtifactStore#put }}
+   */
+  protected[database] def put(d: DocumentAbstraction)(implicit transid: 
TransactionId): Future[DocInfo]
+
+  /**
+   * Given a ByteString source it determines if the source can be inlined or 
not by returning an
+   * Either - Left(byteString) containing all the bytes from the source or 
Right(Source[ByteString, _])
+   * if the source is large
+   */
+  protected[database] def inlineOrAttach(
+    docStream: Source[ByteString, _],
+    previousPrefix: ByteString = ByteString.empty): Future[Either[ByteString, 
Source[ByteString, _]]] = {
+    implicit val ec = executionContext
+    docStream.prefixAndTail(1).runWith(Sink.head).flatMap {
+      case (Nil, _) =>
+        Future.successful(Left(previousPrefix))
+      case (Seq(prefix), tail) =>
+        val completePrefix = previousPrefix ++ prefix
+        if (completePrefix.size < maxInlineSize.toBytes) {
+          inlineOrAttach(tail, completePrefix)
+        } else {
+          Future.successful(Right(tail.prepend(Source.single(completePrefix))))
+        }
+    }
+  }
+
+  /**
+   * Constructs a URI for the attachment
+   *
+   * @param bytesOrSource either byteString or byteString source
+   * @param path function to generate the attachment name for non inlined case
+   * @return constructed uri. In case of inlined attachment the uri contains 
base64 encoded inlined attachment content
+   */
+  protected[database] def uriOf(bytesOrSource: Either[ByteString, 
Source[ByteString, _]], path: => String): Uri = {
+    bytesOrSource match {
+      case Left(bytes) => Uri.from(scheme = MemScheme, path = encode(bytes))
+      case Right(_)    => Uri.from(scheme = attachmentScheme, path = path)
+    }
+  }
+
+  /**
+   * Constructs a source from inlined attachment contents
+   */
+  protected[database] def memorySource(uri: Uri): Source[ByteString, NotUsed] 
= {
+    require(uri.scheme == MemScheme, s"URI $uri scheme is not $MemScheme")
+    Source.single(ByteString(decode(uri)))
+  }
+
+  protected[database] def isInlined(uri: Uri): Boolean = uri.scheme == 
MemScheme
+
+  /**
+   * Computes digest for passed bytes as hex encoded string
+   */
+  protected[database] def digest(bytes: TraversableOnce[Byte]): String = {
+    val digester = StoreUtils.emptyDigest()
+    digester.update(bytes.toArray)
+    StoreUtils.encodeDigest(digester.digest())
+  }
+
+  /**
+   * Attaches the passed source content to  an {{ AttachmentStore }}
+   *
+   * @param doc document with attachment
+   * @param update function to update the `Attached` state with attachment 
metadata
+   * @param contentType contentType of the attachment
+   * @param docStream attachment source
+   * @param oldAttachment old attachment in case of update. Required for 
deleting the old attachment
+   * @param attachmentStore attachmentStore where attachment needs to be stored
+   *
+   * @return a tuple of updated document info and attachment metadata
+   */
+  protected[database] def attachToExternalStore[A <: DocumentAbstraction](
+    doc: A,
+    update: (A, Attached) => A,
+    contentType: ContentType,
+    docStream: Source[ByteString, _],
+    oldAttachment: Option[Attached],
+    attachmentStore: AttachmentStore)(implicit transid: TransactionId): 
Future[(DocInfo, Attached)] = {
+
+    val asJson = doc.toDocumentRecord
+    val id = asJson.fields("_id").convertTo[String].trim
+
+    implicit val ec = executionContext
+
+    for {
+      bytesOrSource <- inlineOrAttach(docStream)
+      uri = uriOf(bytesOrSource, UUID().asString)
+      attached <- {
+        // Upload if cannot be inlined
+        bytesOrSource match {
+          case Left(bytes) =>
+            Future.successful(Attached(uri.toString, contentType, 
Some(bytes.size), Some(digest(bytes))))
+          case Right(source) =>
+            attachmentStore
+              .attach(DocId(id), uri.path.toString, contentType, source)
+              .map(r => Attached(uri.toString, contentType, Some(r.length), 
Some(r.digest)))
+        }
+      }
+      i1 <- put(update(doc, attached))
+
+      //Remove old attachment if it was part of attachmentStore
+      _ <- oldAttachment
+        .map { old =>
+          val oldUri = Uri(old.attachmentName)
+          if (oldUri.scheme == attachmentStore.scheme) {
+            attachmentStore.deleteAttachment(DocId(id), oldUri.path.toString)
+          } else {
+            Future.successful(true)
+          }
+        }
+        .getOrElse(Future.successful(true))
+    } yield (i1, attached)
+  }
+
+  private def encode(bytes: Seq[Byte]): String = {
+    Base64.getUrlEncoder.encodeToString(bytes.toArray)
+  }
+
+  private def decode(uri: Uri): Array[Byte] = {
+    Base64.getUrlDecoder.decode(uri.path.toString())
+  }
+}
diff --git 
a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala 
b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
index 853adea..fba0def 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
@@ -27,7 +27,7 @@ import spray.json._
 import whisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
 import whisk.core.database.StoreUtils._
 import whisk.core.entity.Attachments.Attached
-import whisk.core.entity.{BulkEntityResult, DocId, DocInfo, DocumentReader, 
UUID}
+import whisk.core.entity.{BulkEntityResult, DocInfo, DocumentReader, UUID}
 import whisk.http.Messages
 
 import scala.concurrent.duration._
@@ -61,7 +61,7 @@ class CouchDbRestStore[DocumentAbstraction <: 
DocumentSerializer](dbProtocol: St
   docReader: DocumentReader)
     extends ArtifactStore[DocumentAbstraction]
     with DefaultJsonProtocol
-    with AttachmentInliner {
+    with AttachmentSupport[DocumentAbstraction] {
 
   protected[core] implicit val executionContext = 
system.dispatchers.lookup("dispatchers.couch-dispatcher")
 
@@ -360,8 +360,8 @@ class CouchDbRestStore[DocumentAbstraction <: 
DocumentSerializer](dbProtocol: St
     oldAttachment: Option[Attached])(implicit transid: TransactionId): 
Future[(DocInfo, Attached)] = {
 
     attachmentStore match {
-      case Some(_) =>
-        attachToExternalStore(doc, update, contentType, docStream, 
oldAttachment)
+      case Some(as) =>
+        attachToExternalStore(doc, update, contentType, docStream, 
oldAttachment, as)
       case None =>
         attachToCouch(doc, update, contentType, docStream)
     }
@@ -371,7 +371,7 @@ class CouchDbRestStore[DocumentAbstraction <: 
DocumentSerializer](dbProtocol: St
     doc: A,
     update: (A, Attached) => A,
     contentType: ContentType,
-    docStream: Source[ByteString, _])(implicit transid: TransactionId) = {
+    docStream: Source[ByteString, _])(implicit transid: TransactionId): 
Future[(DocInfo, Attached)] = {
 
     if (maxInlineSize.toBytes == 0) {
       val uri = Uri.from(scheme = attachmentScheme, path = UUID().asString)
@@ -382,67 +382,24 @@ class CouchDbRestStore[DocumentAbstraction <: 
DocumentSerializer](dbProtocol: St
       } yield (i2, attached)
     } else {
       for {
-        (bytes, tailSource) <- inlineAndTail(docStream)
-        uri <- Future.successful(uriOf(bytes, UUID().asString))
+        bytesOrSource <- inlineOrAttach(docStream)
+        uri = uriOf(bytesOrSource, UUID().asString)
         attached <- {
-          val a = if (isInlined(uri)) {
-            Attached(uri.toString, contentType, Some(bytes.size), 
Some(digest(bytes)))
-          } else {
-            Attached(uri.toString, contentType)
+          val a = bytesOrSource match {
+            case Left(bytes) => Attached(uri.toString, contentType, 
Some(bytes.size), Some(digest(bytes)))
+            case Right(_)    => Attached(uri.toString, contentType)
           }
           Future.successful(a)
         }
         i1 <- put(update(doc, attached))
-        i2 <- if (isInlined(uri)) {
-          Future.successful(i1)
-        } else {
-          attach(i1, uri.path.toString, attached.attachmentType, 
combinedSource(bytes, tailSource))
+        i2 <- bytesOrSource match {
+          case Left(_)  => Future.successful(i1)
+          case Right(s) => attach(i1, uri.path.toString, 
attached.attachmentType, s)
         }
       } yield (i2, attached)
     }
   }
 
-  private def attachToExternalStore[A <: DocumentAbstraction](
-    doc: A,
-    update: (A, Attached) => A,
-    contentType: ContentType,
-    docStream: Source[ByteString, _],
-    oldAttachment: Option[Attached])(implicit transid: TransactionId) = {
-    val as = attachmentStore.get
-    val asJson = doc.toDocumentRecord
-    val id = asJson.fields("_id").convertTo[String].trim
-
-    for {
-      (bytes, tailSource) <- inlineAndTail(docStream)
-      uri <- Future.successful(uriOf(bytes, UUID().asString))
-      attached <- {
-        // Upload if cannot be inlined
-        if (isInlined(uri)) {
-          val a = Attached(uri.toString, contentType, Some(bytes.size), 
Some(digest(bytes)))
-          Future.successful(a)
-        } else {
-          as.attach(DocId(id), uri.path.toString, contentType, 
combinedSource(bytes, tailSource))
-            .map { r =>
-              Attached(uri.toString, contentType, Some(r.length), 
Some(r.digest))
-            }
-        }
-      }
-      i1 <- put(update(doc, attached))
-
-      //Remove old attachment if it was part of attachmentStore
-      _ <- oldAttachment
-        .map { old =>
-          val oldUri = Uri(old.attachmentName)
-          if (oldUri.scheme == as.scheme) {
-            as.deleteAttachment(DocId(id), oldUri.path.toString)
-          } else {
-            Future.successful(true)
-          }
-        }
-        .getOrElse(Future.successful(true))
-    } yield (i1, attached)
-  }
-
   private def attach(doc: DocInfo, name: String, contentType: ContentType, 
docStream: Source[ByteString, _])(
     implicit transid: TransactionId): Future[DocInfo] = {
 
@@ -492,7 +449,7 @@ class CouchDbRestStore[DocumentAbstraction <: 
DocumentSerializer](dbProtocol: St
     val name = attached.attachmentName
     val attachmentUri = Uri(name)
     attachmentUri.scheme match {
-      case AttachmentInliner.MemScheme =>
+      case AttachmentSupport.MemScheme =>
         memorySource(attachmentUri).runWith(sink)
       case s if s == couchScheme || attachmentUri.isRelative =>
         //relative case is for compatibility with earlier naming approach 
where attachment name would be like 'jarfile'
diff --git 
a/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
 
b/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
index 140b854..f01064c 100644
--- 
a/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
+++ 
b/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
@@ -94,7 +94,7 @@ class MemoryArtifactStore[DocumentAbstraction <: 
DocumentSerializer](dbName: Str
     extends ArtifactStore[DocumentAbstraction]
     with DefaultJsonProtocol
     with DocumentProvider
-    with AttachmentInliner {
+    with AttachmentSupport[DocumentAbstraction] {
 
   override protected[core] implicit val executionContext: ExecutionContext = 
system.dispatcher
 
@@ -288,38 +288,7 @@ class MemoryArtifactStore[DocumentAbstraction <: 
DocumentSerializer](dbName: Str
     contentType: ContentType,
     docStream: Source[ByteString, _],
     oldAttachment: Option[Attached])(implicit transid: TransactionId): 
Future[(DocInfo, Attached)] = {
-
-    val asJson = d.toDocumentRecord
-    val id = asJson.fields(_id).convertTo[String].trim
-    //Inlined attachment with Memory storage is not required. However to 
validate the constructs
-    //inlined support is implemented
-    for {
-      (bytes, tailSource) <- inlineAndTail(docStream)
-      uri <- Future.successful(uriOf(bytes, UUID().asString))
-      attached <- {
-        if (isInlined(uri)) {
-          val a = Attached(uri.toString, contentType, Some(bytes.size), 
Some(digest(bytes)))
-          Future.successful(a)
-        } else {
-          attachmentStore
-            .attach(DocId(id), uri.path.toString, contentType, 
combinedSource(bytes, tailSource))
-            .map { r =>
-              Attached(uri.toString, contentType, Some(r.length), 
Some(r.digest))
-            }
-        }
-      }
-      i1 <- put(update(d, attached))
-      _ <- oldAttachment
-        .map { old =>
-          val oldUri = Uri(old.attachmentName)
-          if (oldUri.scheme == attachmentStore.scheme) {
-            attachmentStore.deleteAttachment(DocId(id), oldUri.path.toString)
-          } else {
-            Future.successful(true)
-          }
-        }
-        .getOrElse(Future.successful(true))
-    } yield (i1, attached)
+    attachToExternalStore(d, update, contentType, docStream, oldAttachment, 
attachmentStore)
   }
 
   override def shutdown(): Unit = {
diff --git 
a/common/scala/src/main/scala/whisk/core/database/memory/MemoryAttachmentStore.scala
 
b/common/scala/src/main/scala/whisk/core/database/memory/MemoryAttachmentStore.scala
index face871..cb80c24 100644
--- 
a/common/scala/src/main/scala/whisk/core/database/memory/MemoryAttachmentStore.scala
+++ 
b/common/scala/src/main/scala/whisk/core/database/memory/MemoryAttachmentStore.scala
@@ -62,7 +62,8 @@ class MemoryAttachmentStore(dbName: String)(implicit system: 
ActorSystem,
     contentType: ContentType,
     docStream: Source[ByteString, _])(implicit transid: TransactionId): 
Future[AttachResult] = {
     require(name != null, "name undefined")
-    val start = transid.started(this, DATABASE_ATT_SAVE, s"[ATT_PUT] uploading 
attachment '$name' of document '$docId'")
+    val start =
+      transid.started(this, DATABASE_ATT_SAVE, s"[ATT_PUT] uploading 
attachment '$name' of document 'id: $docId'")
 
     val uploadSink = Sink.fold[ByteStringBuilder, ByteString](new 
ByteStringBuilder)((builder, b) => builder ++= b)
 
@@ -78,7 +79,8 @@ class MemoryAttachmentStore(dbName: String)(implicit system: 
ActorSystem,
     reportFailure(
       g,
       start,
-      failure => s"[ATT_PUT] '$dbName' internal error, name: '$name', doc: 
'$docId', failure: '${failure.getMessage}'")
+      failure =>
+        s"[ATT_PUT] '$dbName' internal error, name: '$name', doc: 'id: 
$docId', failure: '${failure.getMessage}'")
   }
 
   /**
@@ -88,7 +90,10 @@ class MemoryAttachmentStore(dbName: String)(implicit system: 
ActorSystem,
     implicit transid: TransactionId): Future[T] = {
 
     val start =
-      transid.started(this, DATABASE_ATT_GET, s"[ATT_GET] '$dbName' finding 
attachment '$name' of document '$docId'")
+      transid.started(
+        this,
+        DATABASE_ATT_GET,
+        s"[ATT_GET] '$dbName' finding attachment '$name' of document 'id: 
$docId'")
 
     val f = attachments.get(attachmentKey(docId, name)) match {
       case Some(Attachment(bytes)) =>
diff --git 
a/tests/src/test/scala/whisk/core/database/test/AttachmentInlinerTests.scala 
b/tests/src/test/scala/whisk/core/database/test/AttachmentSupportTests.scala
similarity index 68%
rename from 
tests/src/test/scala/whisk/core/database/test/AttachmentInlinerTests.scala
rename to 
tests/src/test/scala/whisk/core/database/test/AttachmentSupportTests.scala
index 783d57e..e93475d 100644
--- a/tests/src/test/scala/whisk/core/database/test/AttachmentInlinerTests.scala
+++ b/tests/src/test/scala/whisk/core/database/test/AttachmentSupportTests.scala
@@ -20,40 +20,38 @@ package whisk.core.database.test
 import akka.http.scaladsl.model.Uri
 import akka.stream.scaladsl.Source
 import akka.stream.{ActorMaterializer, Materializer}
-import akka.util.{ByteStringBuilder, CompactByteString}
+import akka.util.CompactByteString
 import common.WskActorSystem
 import org.junit.runner.RunWith
-import whisk.core.entity.size._
-import org.scalatest.{FlatSpec, Matchers}
 import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.junit.JUnitRunner
-import whisk.core.database.{AttachmentInliner, InliningConfig}
+import org.scalatest.{FlatSpec, Matchers}
+import whisk.common.TransactionId
+import whisk.core.database.{AttachmentSupport, InliningConfig}
+import whisk.core.entity.WhiskEntity
+import whisk.core.entity.size._
 
 @RunWith(classOf[JUnitRunner])
-class AttachmentInlinerTests extends FlatSpec with Matchers with ScalaFutures 
with WskActorSystem {
+class AttachmentSupportTests extends FlatSpec with Matchers with ScalaFutures 
with WskActorSystem {
 
   behavior of "Attachment inlining"
 
   implicit val materializer: Materializer = ActorMaterializer()
 
   it should "not inline if maxInlineSize set to zero" in {
-    val inliner = new TestInliner(InliningConfig(maxInlineSize = 0.KB, 
chunkSize = 8.KB))
+    val inliner = new AttachmentSupportTestMock(InliningConfig(maxInlineSize = 
0.KB))
     val bs = CompactByteString("hello world")
 
-    val (head, tail) = inliner.inlineAndTail(Source.single(bs)).futureValue
-    val uri = inliner.uriOf(head, "foo")
+    val bytesOrSource = inliner.inlineOrAttach(Source.single(bs)).futureValue
+    val uri = inliner.uriOf(bytesOrSource, "foo")
 
     uri shouldBe Uri("test:foo")
-
-    val bsResult = toByteString(inliner.combinedSource(head, tail)).futureValue
-    bsResult shouldBe bs
   }
 
-  private def toByteString(docStream: Source[Traversable[Byte], _]) =
-    docStream.runFold(new ByteStringBuilder)((builder, b) => builder ++= 
b).map(_.result().compact)
-
-  class TestInliner(val inliningConfig: InliningConfig) extends 
AttachmentInliner {
+  class AttachmentSupportTestMock(val inliningConfig: InliningConfig) extends 
AttachmentSupport[WhiskEntity] {
     override protected[core] implicit val materializer: Materializer = 
ActorMaterializer()
     override protected def attachmentScheme: String = "test"
+    override protected def executionContext = actorSystem.dispatcher
+    override protected[database] def put(d: WhiskEntity)(implicit transid: 
TransactionId) = ???
   }
 }
diff --git a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala 
b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
index 47c884a..78910fc 100644
--- a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
+++ b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
@@ -296,7 +296,7 @@ trait DbUtils extends Assertions {
    */
   def inlinedAttachmentSize(db: ArtifactStore[_]): Int = {
     db match {
-      case inliner: AttachmentInliner =>
+      case inliner: AttachmentSupport[_] =>
         inliner.maxInlineSize.toBytes.toInt - 1
       case _ =>
         throw new IllegalStateException(s"ArtifactStore does not support 
attachment inlining $db")
@@ -308,10 +308,8 @@ trait DbUtils extends Assertions {
    */
   def nonInlinedAttachmentSize(db: ArtifactStore[_]): Int = {
     db match {
-      case inliner: AttachmentInliner =>
-        val inlineSize = inliner.maxInlineSize.toBytes.toInt
-        val chunkSize = inliner.chunkSize.toBytes.toInt
-        Math.max(inlineSize, chunkSize) * 2
+      case inliner: AttachmentSupport[_] =>
+        inliner.maxInlineSize.toBytes.toInt * 2
       case _ =>
         42
     }
diff --git 
a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
 
b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
index a32eb28..82b6b80 100644
--- 
a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
+++ 
b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
@@ -25,7 +25,8 @@ import akka.stream.IOResult
 import akka.stream.scaladsl.{Sink, StreamConverters}
 import akka.util.{ByteString, ByteStringBuilder}
 import whisk.common.TransactionId
-import whisk.core.database.{AttachmentInliner, CacheChangeNotification, 
NoDocumentException}
+import whisk.core.entity.size._
+import whisk.core.database.{AttachmentSupport, CacheChangeNotification, 
NoDocumentException}
 import whisk.core.entity.Attachments.{Attached, Attachment, Inline}
 import whisk.core.entity.test.ExecHelpers
 import whisk.core.entity.{CodeExec, DocInfo, EntityName, ExecManifest, 
WhiskAction}
@@ -112,14 +113,14 @@ trait ArtifactStoreAttachmentBehaviors extends 
ArtifactStoreBehaviorBase with Ex
     getAttachmentBytes(i2, attached(action2)).futureValue.result() shouldBe 
decode(code1)
   }
 
-  it should "put and read same attachment" in {
+  it should "put and read 5 MB attachment" in {
     implicit val tid: TransactionId = transid()
-    val size = nonInlinedAttachmentSize(entityStore)
+    val size = Math.max(nonInlinedAttachmentSize(entityStore), 
5.MB.toBytes.toInt)
     val base64 = encodedRandomBytes(size)
 
     val exec = javaDefault(base64, Some("hello"))
     val javaAction =
-      WhiskAction(namespace, EntityName("attachment_unique"), exec)
+      WhiskAction(namespace, EntityName("attachment_large"), exec)
 
     val i1 = WhiskAction.put(entityStore, javaAction, old = None).futureValue
     val action2 = entityStore.get[WhiskAction](i1, 
attachmentHandler).futureValue
@@ -161,7 +162,7 @@ trait ArtifactStoreAttachmentBehaviors extends 
ArtifactStoreBehaviorBase with Ex
     val a = attached(action2)
 
     val attachmentUri = Uri(a.attachmentName)
-    attachmentUri.scheme shouldBe AttachmentInliner.MemScheme
+    attachmentUri.scheme shouldBe AttachmentSupport.MemScheme
     a.length shouldBe Some(attachmentSize)
     a.digest should not be empty
   }

Reply via email to