This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 3f43f5d Avoid converting ByteString to bytes in attachment inlining
flow (#3777)
3f43f5d is described below
commit 3f43f5d85de56caf9f527e3e7a9b15a4ec46ea70
Author: Chetan Mehrotra <[email protected]>
AuthorDate: Thu Jun 21 06:34:43 2018 +0530
Avoid converting ByteString to bytes in attachment inlining flow (#3777)
Re-enables attachment inlining by "prefixAndTail" 1 ByteString at a time
instead of flattening the source stream to byte.
In addition logic to attach the attachment to external AttachmentStore is
refactored to "AttachmentSupport" and reused across ArtifactSTore
implementations.
---
common/scala/src/main/resources/application.conf | 6 +-
.../whisk/core/database/AttachmentInliner.scala | 117 -------------
.../whisk/core/database/AttachmentSupport.scala | 190 +++++++++++++++++++++
.../whisk/core/database/CouchDbRestStore.scala | 71 ++------
.../core/database/memory/MemoryArtifactStore.scala | 35 +---
.../database/memory/MemoryAttachmentStore.scala | 11 +-
...nerTests.scala => AttachmentSupportTests.scala} | 28 ++-
.../scala/whisk/core/database/test/DbUtils.scala | 8 +-
.../ArtifactStoreAttachmentBehaviors.scala | 11 +-
9 files changed, 237 insertions(+), 240 deletions(-)
diff --git a/common/scala/src/main/resources/application.conf
b/common/scala/src/main/resources/application.conf
index 6aa1bbd..3ceaab8 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -117,11 +117,7 @@ whisk {
# Size limit for inlined attachments. Attachments having size less
than this would
# be inlined with there content encoded in attachmentName
- max-inline-size = 0 k
-
- # Chunk sized for converting source of bytes to ByteString as part of
attachment
- # upload flow
- chunk-size = 8 k
+ max-inline-size = 16 k
}
# CouchDB related configuration
diff --git
a/common/scala/src/main/scala/whisk/core/database/AttachmentInliner.scala
b/common/scala/src/main/scala/whisk/core/database/AttachmentInliner.scala
deleted file mode 100644
index 2369b1b..0000000
--- a/common/scala/src/main/scala/whisk/core/database/AttachmentInliner.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.database
-
-import java.util.Base64
-
-import akka.NotUsed
-import akka.http.scaladsl.model.Uri
-import akka.stream.Materializer
-import akka.stream.scaladsl.{Concat, Sink, Source}
-import akka.util.{ByteString, ByteStringBuilder}
-import whisk.core.database.AttachmentInliner.MemScheme
-import whisk.core.entity.ByteSize
-
-import scala.collection.immutable
-import scala.concurrent.Future
-
-object AttachmentInliner {
-
- /**
- * Scheme name for attachments which are inlined
- */
- val MemScheme: String = "mem"
-}
-
-case class InliningConfig(maxInlineSize: ByteSize, chunkSize: ByteSize)
-
-/**
- * Provides support for inlining small attachments. Inlined attachment
contents are encoded as part of attachment
- * name itself.
- */
-trait AttachmentInliner {
-
- /** Materializer required for stream processing */
- protected[core] implicit val materializer: Materializer
-
- protected[database] def inlineAndTail(
- docStream: Source[ByteString, _]): Future[(immutable.Seq[Byte],
Source[Byte, _])] = {
- docStream
- .mapConcat(_.seq)
- .prefixAndTail(maxInlineSize.toBytes.toInt)
- .runWith(Sink.head[(immutable.Seq[Byte], Source[Byte, _])])
- }
-
- protected[database] def uriOf(bytes: Seq[Byte], path: => String): Uri = {
- //For less than case its definitive that tail source would be empty
- //for equal case it cannot be determined if tail source is empty. Actual
max inline size
- //would be inlineSize - 1
- if (bytes.size < maxInlineSize.toBytes) {
- Uri.from(scheme = MemScheme, path = encode(bytes))
- } else {
- Uri.from(scheme = attachmentScheme, path = path)
- }
- }
-
- /**
- * Constructs a combined source based on attachment content read so far and
rest of unread content.
- * Emitted elements are up to `chunkSize` sized [[akka.util.ByteString]]
elements.
- */
- protected[database] def combinedSource(inlinedBytes: immutable.Seq[Byte],
- tailSource: Source[Byte, _]):
Source[ByteString, NotUsed] =
- Source
- .combine(Source(inlinedBytes), tailSource)(Concat[Byte])
- .batch[ByteStringBuilder](chunkSize.toBytes, b => { val bb = new
ByteStringBuilder(); bb += b })((bb, b) =>
- bb += b)
- .map(_.result())
-
- /**
- * Constructs a source from inlined attachment contents
- */
- protected[database] def memorySource(uri: Uri): Source[ByteString, NotUsed]
= {
- require(uri.scheme == MemScheme, s"URI $uri scheme is not $MemScheme")
- Source.single(ByteString(decode(uri)))
- }
-
- protected[database] def isInlined(uri: Uri): Boolean = uri.scheme ==
MemScheme
-
- protected[database] def digest(bytes: TraversableOnce[Byte]): String = {
- val digester = StoreUtils.emptyDigest()
- digester.update(bytes.toArray)
- StoreUtils.encodeDigest(digester.digest())
- }
-
- /**
- * Attachments having size less than this would be inlined
- */
- def maxInlineSize: ByteSize = inliningConfig.maxInlineSize
-
- def chunkSize: ByteSize = inliningConfig.chunkSize
-
- protected def inliningConfig: InliningConfig
-
- protected def attachmentScheme: String
-
- private def encode(bytes: Seq[Byte]): String = {
- Base64.getUrlEncoder.encodeToString(bytes.toArray)
- }
-
- private def decode(uri: Uri): Array[Byte] = {
- Base64.getUrlDecoder.decode(uri.path.toString())
- }
-}
diff --git
a/common/scala/src/main/scala/whisk/core/database/AttachmentSupport.scala
b/common/scala/src/main/scala/whisk/core/database/AttachmentSupport.scala
new file mode 100644
index 0000000..72f7754
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/database/AttachmentSupport.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database
+
+import java.util.Base64
+
+import akka.NotUsed
+import akka.http.scaladsl.model.{ContentType, Uri}
+import akka.stream.Materializer
+import akka.stream.scaladsl.{Sink, Source}
+import akka.util.ByteString
+import spray.json.DefaultJsonProtocol
+import whisk.common.TransactionId
+import whisk.core.database.AttachmentSupport.MemScheme
+import whisk.core.entity.Attachments.Attached
+import whisk.core.entity.{ByteSize, DocId, DocInfo, UUID}
+
+import scala.concurrent.{ExecutionContext, Future}
+
+object AttachmentSupport {
+
+ /**
+ * Scheme name for attachments which are inlined
+ */
+ val MemScheme: String = "mem"
+}
+
+case class InliningConfig(maxInlineSize: ByteSize)
+
+/**
+ * Provides support for inlining small attachments. Inlined attachment
contents are encoded as part of attachment
+ * name itself.
+ */
+trait AttachmentSupport[DocumentAbstraction <: DocumentSerializer] extends
DefaultJsonProtocol {
+
+ /** Materializer required for stream processing */
+ protected[core] implicit val materializer: Materializer
+
+ protected def executionContext: ExecutionContext
+
+ /**
+ * Attachment scheme name to use for non inlined attachments
+ */
+ protected def attachmentScheme: String
+
+ protected def inliningConfig: InliningConfig
+
+ /**
+ * Attachments having size less than this would be inlined
+ */
+ def maxInlineSize: ByteSize = inliningConfig.maxInlineSize
+
+ /**
+ * See {{ ArtifactStore#put }}
+ */
+ protected[database] def put(d: DocumentAbstraction)(implicit transid:
TransactionId): Future[DocInfo]
+
+ /**
+ * Given a ByteString source it determines if the source can be inlined or
not by returning an
+ * Either - Left(byteString) containing all the bytes from the source or
Right(Source[ByteString, _])
+ * if the source is large
+ */
+ protected[database] def inlineOrAttach(
+ docStream: Source[ByteString, _],
+ previousPrefix: ByteString = ByteString.empty): Future[Either[ByteString,
Source[ByteString, _]]] = {
+ implicit val ec = executionContext
+ docStream.prefixAndTail(1).runWith(Sink.head).flatMap {
+ case (Nil, _) =>
+ Future.successful(Left(previousPrefix))
+ case (Seq(prefix), tail) =>
+ val completePrefix = previousPrefix ++ prefix
+ if (completePrefix.size < maxInlineSize.toBytes) {
+ inlineOrAttach(tail, completePrefix)
+ } else {
+ Future.successful(Right(tail.prepend(Source.single(completePrefix))))
+ }
+ }
+ }
+
+ /**
+ * Constructs a URI for the attachment
+ *
+ * @param bytesOrSource either byteString or byteString source
+ * @param path function to generate the attachment name for non inlined case
+ * @return constructed uri. In case of inlined attachment the uri contains
base64 encoded inlined attachment content
+ */
+ protected[database] def uriOf(bytesOrSource: Either[ByteString,
Source[ByteString, _]], path: => String): Uri = {
+ bytesOrSource match {
+ case Left(bytes) => Uri.from(scheme = MemScheme, path = encode(bytes))
+ case Right(_) => Uri.from(scheme = attachmentScheme, path = path)
+ }
+ }
+
+ /**
+ * Constructs a source from inlined attachment contents
+ */
+ protected[database] def memorySource(uri: Uri): Source[ByteString, NotUsed]
= {
+ require(uri.scheme == MemScheme, s"URI $uri scheme is not $MemScheme")
+ Source.single(ByteString(decode(uri)))
+ }
+
+ protected[database] def isInlined(uri: Uri): Boolean = uri.scheme ==
MemScheme
+
+ /**
+ * Computes digest for passed bytes as hex encoded string
+ */
+ protected[database] def digest(bytes: TraversableOnce[Byte]): String = {
+ val digester = StoreUtils.emptyDigest()
+ digester.update(bytes.toArray)
+ StoreUtils.encodeDigest(digester.digest())
+ }
+
+ /**
+ * Attaches the passed source content to an {{ AttachmentStore }}
+ *
+ * @param doc document with attachment
+ * @param update function to update the `Attached` state with attachment
metadata
+ * @param contentType contentType of the attachment
+ * @param docStream attachment source
+ * @param oldAttachment old attachment in case of update. Required for
deleting the old attachment
+ * @param attachmentStore attachmentStore where attachment needs to be stored
+ *
+ * @return a tuple of updated document info and attachment metadata
+ */
+ protected[database] def attachToExternalStore[A <: DocumentAbstraction](
+ doc: A,
+ update: (A, Attached) => A,
+ contentType: ContentType,
+ docStream: Source[ByteString, _],
+ oldAttachment: Option[Attached],
+ attachmentStore: AttachmentStore)(implicit transid: TransactionId):
Future[(DocInfo, Attached)] = {
+
+ val asJson = doc.toDocumentRecord
+ val id = asJson.fields("_id").convertTo[String].trim
+
+ implicit val ec = executionContext
+
+ for {
+ bytesOrSource <- inlineOrAttach(docStream)
+ uri = uriOf(bytesOrSource, UUID().asString)
+ attached <- {
+ // Upload if cannot be inlined
+ bytesOrSource match {
+ case Left(bytes) =>
+ Future.successful(Attached(uri.toString, contentType,
Some(bytes.size), Some(digest(bytes))))
+ case Right(source) =>
+ attachmentStore
+ .attach(DocId(id), uri.path.toString, contentType, source)
+ .map(r => Attached(uri.toString, contentType, Some(r.length),
Some(r.digest)))
+ }
+ }
+ i1 <- put(update(doc, attached))
+
+ //Remove old attachment if it was part of attachmentStore
+ _ <- oldAttachment
+ .map { old =>
+ val oldUri = Uri(old.attachmentName)
+ if (oldUri.scheme == attachmentStore.scheme) {
+ attachmentStore.deleteAttachment(DocId(id), oldUri.path.toString)
+ } else {
+ Future.successful(true)
+ }
+ }
+ .getOrElse(Future.successful(true))
+ } yield (i1, attached)
+ }
+
+ private def encode(bytes: Seq[Byte]): String = {
+ Base64.getUrlEncoder.encodeToString(bytes.toArray)
+ }
+
+ private def decode(uri: Uri): Array[Byte] = {
+ Base64.getUrlDecoder.decode(uri.path.toString())
+ }
+}
diff --git
a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
index 853adea..fba0def 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
@@ -27,7 +27,7 @@ import spray.json._
import whisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
import whisk.core.database.StoreUtils._
import whisk.core.entity.Attachments.Attached
-import whisk.core.entity.{BulkEntityResult, DocId, DocInfo, DocumentReader,
UUID}
+import whisk.core.entity.{BulkEntityResult, DocInfo, DocumentReader, UUID}
import whisk.http.Messages
import scala.concurrent.duration._
@@ -61,7 +61,7 @@ class CouchDbRestStore[DocumentAbstraction <:
DocumentSerializer](dbProtocol: St
docReader: DocumentReader)
extends ArtifactStore[DocumentAbstraction]
with DefaultJsonProtocol
- with AttachmentInliner {
+ with AttachmentSupport[DocumentAbstraction] {
protected[core] implicit val executionContext =
system.dispatchers.lookup("dispatchers.couch-dispatcher")
@@ -360,8 +360,8 @@ class CouchDbRestStore[DocumentAbstraction <:
DocumentSerializer](dbProtocol: St
oldAttachment: Option[Attached])(implicit transid: TransactionId):
Future[(DocInfo, Attached)] = {
attachmentStore match {
- case Some(_) =>
- attachToExternalStore(doc, update, contentType, docStream,
oldAttachment)
+ case Some(as) =>
+ attachToExternalStore(doc, update, contentType, docStream,
oldAttachment, as)
case None =>
attachToCouch(doc, update, contentType, docStream)
}
@@ -371,7 +371,7 @@ class CouchDbRestStore[DocumentAbstraction <:
DocumentSerializer](dbProtocol: St
doc: A,
update: (A, Attached) => A,
contentType: ContentType,
- docStream: Source[ByteString, _])(implicit transid: TransactionId) = {
+ docStream: Source[ByteString, _])(implicit transid: TransactionId):
Future[(DocInfo, Attached)] = {
if (maxInlineSize.toBytes == 0) {
val uri = Uri.from(scheme = attachmentScheme, path = UUID().asString)
@@ -382,67 +382,24 @@ class CouchDbRestStore[DocumentAbstraction <:
DocumentSerializer](dbProtocol: St
} yield (i2, attached)
} else {
for {
- (bytes, tailSource) <- inlineAndTail(docStream)
- uri <- Future.successful(uriOf(bytes, UUID().asString))
+ bytesOrSource <- inlineOrAttach(docStream)
+ uri = uriOf(bytesOrSource, UUID().asString)
attached <- {
- val a = if (isInlined(uri)) {
- Attached(uri.toString, contentType, Some(bytes.size),
Some(digest(bytes)))
- } else {
- Attached(uri.toString, contentType)
+ val a = bytesOrSource match {
+ case Left(bytes) => Attached(uri.toString, contentType,
Some(bytes.size), Some(digest(bytes)))
+ case Right(_) => Attached(uri.toString, contentType)
}
Future.successful(a)
}
i1 <- put(update(doc, attached))
- i2 <- if (isInlined(uri)) {
- Future.successful(i1)
- } else {
- attach(i1, uri.path.toString, attached.attachmentType,
combinedSource(bytes, tailSource))
+ i2 <- bytesOrSource match {
+ case Left(_) => Future.successful(i1)
+ case Right(s) => attach(i1, uri.path.toString,
attached.attachmentType, s)
}
} yield (i2, attached)
}
}
- private def attachToExternalStore[A <: DocumentAbstraction](
- doc: A,
- update: (A, Attached) => A,
- contentType: ContentType,
- docStream: Source[ByteString, _],
- oldAttachment: Option[Attached])(implicit transid: TransactionId) = {
- val as = attachmentStore.get
- val asJson = doc.toDocumentRecord
- val id = asJson.fields("_id").convertTo[String].trim
-
- for {
- (bytes, tailSource) <- inlineAndTail(docStream)
- uri <- Future.successful(uriOf(bytes, UUID().asString))
- attached <- {
- // Upload if cannot be inlined
- if (isInlined(uri)) {
- val a = Attached(uri.toString, contentType, Some(bytes.size),
Some(digest(bytes)))
- Future.successful(a)
- } else {
- as.attach(DocId(id), uri.path.toString, contentType,
combinedSource(bytes, tailSource))
- .map { r =>
- Attached(uri.toString, contentType, Some(r.length),
Some(r.digest))
- }
- }
- }
- i1 <- put(update(doc, attached))
-
- //Remove old attachment if it was part of attachmentStore
- _ <- oldAttachment
- .map { old =>
- val oldUri = Uri(old.attachmentName)
- if (oldUri.scheme == as.scheme) {
- as.deleteAttachment(DocId(id), oldUri.path.toString)
- } else {
- Future.successful(true)
- }
- }
- .getOrElse(Future.successful(true))
- } yield (i1, attached)
- }
-
private def attach(doc: DocInfo, name: String, contentType: ContentType,
docStream: Source[ByteString, _])(
implicit transid: TransactionId): Future[DocInfo] = {
@@ -492,7 +449,7 @@ class CouchDbRestStore[DocumentAbstraction <:
DocumentSerializer](dbProtocol: St
val name = attached.attachmentName
val attachmentUri = Uri(name)
attachmentUri.scheme match {
- case AttachmentInliner.MemScheme =>
+ case AttachmentSupport.MemScheme =>
memorySource(attachmentUri).runWith(sink)
case s if s == couchScheme || attachmentUri.isRelative =>
//relative case is for compatibility with earlier naming approach
where attachment name would be like 'jarfile'
diff --git
a/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
b/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
index 140b854..f01064c 100644
---
a/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
+++
b/common/scala/src/main/scala/whisk/core/database/memory/MemoryArtifactStore.scala
@@ -94,7 +94,7 @@ class MemoryArtifactStore[DocumentAbstraction <:
DocumentSerializer](dbName: Str
extends ArtifactStore[DocumentAbstraction]
with DefaultJsonProtocol
with DocumentProvider
- with AttachmentInliner {
+ with AttachmentSupport[DocumentAbstraction] {
override protected[core] implicit val executionContext: ExecutionContext =
system.dispatcher
@@ -288,38 +288,7 @@ class MemoryArtifactStore[DocumentAbstraction <:
DocumentSerializer](dbName: Str
contentType: ContentType,
docStream: Source[ByteString, _],
oldAttachment: Option[Attached])(implicit transid: TransactionId):
Future[(DocInfo, Attached)] = {
-
- val asJson = d.toDocumentRecord
- val id = asJson.fields(_id).convertTo[String].trim
- //Inlined attachment with Memory storage is not required. However to
validate the constructs
- //inlined support is implemented
- for {
- (bytes, tailSource) <- inlineAndTail(docStream)
- uri <- Future.successful(uriOf(bytes, UUID().asString))
- attached <- {
- if (isInlined(uri)) {
- val a = Attached(uri.toString, contentType, Some(bytes.size),
Some(digest(bytes)))
- Future.successful(a)
- } else {
- attachmentStore
- .attach(DocId(id), uri.path.toString, contentType,
combinedSource(bytes, tailSource))
- .map { r =>
- Attached(uri.toString, contentType, Some(r.length),
Some(r.digest))
- }
- }
- }
- i1 <- put(update(d, attached))
- _ <- oldAttachment
- .map { old =>
- val oldUri = Uri(old.attachmentName)
- if (oldUri.scheme == attachmentStore.scheme) {
- attachmentStore.deleteAttachment(DocId(id), oldUri.path.toString)
- } else {
- Future.successful(true)
- }
- }
- .getOrElse(Future.successful(true))
- } yield (i1, attached)
+ attachToExternalStore(d, update, contentType, docStream, oldAttachment,
attachmentStore)
}
override def shutdown(): Unit = {
diff --git
a/common/scala/src/main/scala/whisk/core/database/memory/MemoryAttachmentStore.scala
b/common/scala/src/main/scala/whisk/core/database/memory/MemoryAttachmentStore.scala
index face871..cb80c24 100644
---
a/common/scala/src/main/scala/whisk/core/database/memory/MemoryAttachmentStore.scala
+++
b/common/scala/src/main/scala/whisk/core/database/memory/MemoryAttachmentStore.scala
@@ -62,7 +62,8 @@ class MemoryAttachmentStore(dbName: String)(implicit system:
ActorSystem,
contentType: ContentType,
docStream: Source[ByteString, _])(implicit transid: TransactionId):
Future[AttachResult] = {
require(name != null, "name undefined")
- val start = transid.started(this, DATABASE_ATT_SAVE, s"[ATT_PUT] uploading
attachment '$name' of document '$docId'")
+ val start =
+ transid.started(this, DATABASE_ATT_SAVE, s"[ATT_PUT] uploading
attachment '$name' of document 'id: $docId'")
val uploadSink = Sink.fold[ByteStringBuilder, ByteString](new
ByteStringBuilder)((builder, b) => builder ++= b)
@@ -78,7 +79,8 @@ class MemoryAttachmentStore(dbName: String)(implicit system:
ActorSystem,
reportFailure(
g,
start,
- failure => s"[ATT_PUT] '$dbName' internal error, name: '$name', doc:
'$docId', failure: '${failure.getMessage}'")
+ failure =>
+ s"[ATT_PUT] '$dbName' internal error, name: '$name', doc: 'id:
$docId', failure: '${failure.getMessage}'")
}
/**
@@ -88,7 +90,10 @@ class MemoryAttachmentStore(dbName: String)(implicit system:
ActorSystem,
implicit transid: TransactionId): Future[T] = {
val start =
- transid.started(this, DATABASE_ATT_GET, s"[ATT_GET] '$dbName' finding
attachment '$name' of document '$docId'")
+ transid.started(
+ this,
+ DATABASE_ATT_GET,
+ s"[ATT_GET] '$dbName' finding attachment '$name' of document 'id:
$docId'")
val f = attachments.get(attachmentKey(docId, name)) match {
case Some(Attachment(bytes)) =>
diff --git
a/tests/src/test/scala/whisk/core/database/test/AttachmentInlinerTests.scala
b/tests/src/test/scala/whisk/core/database/test/AttachmentSupportTests.scala
similarity index 68%
rename from
tests/src/test/scala/whisk/core/database/test/AttachmentInlinerTests.scala
rename to
tests/src/test/scala/whisk/core/database/test/AttachmentSupportTests.scala
index 783d57e..e93475d 100644
--- a/tests/src/test/scala/whisk/core/database/test/AttachmentInlinerTests.scala
+++ b/tests/src/test/scala/whisk/core/database/test/AttachmentSupportTests.scala
@@ -20,40 +20,38 @@ package whisk.core.database.test
import akka.http.scaladsl.model.Uri
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, Materializer}
-import akka.util.{ByteStringBuilder, CompactByteString}
+import akka.util.CompactByteString
import common.WskActorSystem
import org.junit.runner.RunWith
-import whisk.core.entity.size._
-import org.scalatest.{FlatSpec, Matchers}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner
-import whisk.core.database.{AttachmentInliner, InliningConfig}
+import org.scalatest.{FlatSpec, Matchers}
+import whisk.common.TransactionId
+import whisk.core.database.{AttachmentSupport, InliningConfig}
+import whisk.core.entity.WhiskEntity
+import whisk.core.entity.size._
@RunWith(classOf[JUnitRunner])
-class AttachmentInlinerTests extends FlatSpec with Matchers with ScalaFutures
with WskActorSystem {
+class AttachmentSupportTests extends FlatSpec with Matchers with ScalaFutures
with WskActorSystem {
behavior of "Attachment inlining"
implicit val materializer: Materializer = ActorMaterializer()
it should "not inline if maxInlineSize set to zero" in {
- val inliner = new TestInliner(InliningConfig(maxInlineSize = 0.KB,
chunkSize = 8.KB))
+ val inliner = new AttachmentSupportTestMock(InliningConfig(maxInlineSize =
0.KB))
val bs = CompactByteString("hello world")
- val (head, tail) = inliner.inlineAndTail(Source.single(bs)).futureValue
- val uri = inliner.uriOf(head, "foo")
+ val bytesOrSource = inliner.inlineOrAttach(Source.single(bs)).futureValue
+ val uri = inliner.uriOf(bytesOrSource, "foo")
uri shouldBe Uri("test:foo")
-
- val bsResult = toByteString(inliner.combinedSource(head, tail)).futureValue
- bsResult shouldBe bs
}
- private def toByteString(docStream: Source[Traversable[Byte], _]) =
- docStream.runFold(new ByteStringBuilder)((builder, b) => builder ++=
b).map(_.result().compact)
-
- class TestInliner(val inliningConfig: InliningConfig) extends
AttachmentInliner {
+ class AttachmentSupportTestMock(val inliningConfig: InliningConfig) extends
AttachmentSupport[WhiskEntity] {
override protected[core] implicit val materializer: Materializer =
ActorMaterializer()
override protected def attachmentScheme: String = "test"
+ override protected def executionContext = actorSystem.dispatcher
+ override protected[database] def put(d: WhiskEntity)(implicit transid:
TransactionId) = ???
}
}
diff --git a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
index 47c884a..78910fc 100644
--- a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
+++ b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala
@@ -296,7 +296,7 @@ trait DbUtils extends Assertions {
*/
def inlinedAttachmentSize(db: ArtifactStore[_]): Int = {
db match {
- case inliner: AttachmentInliner =>
+ case inliner: AttachmentSupport[_] =>
inliner.maxInlineSize.toBytes.toInt - 1
case _ =>
throw new IllegalStateException(s"ArtifactStore does not support
attachment inlining $db")
@@ -308,10 +308,8 @@ trait DbUtils extends Assertions {
*/
def nonInlinedAttachmentSize(db: ArtifactStore[_]): Int = {
db match {
- case inliner: AttachmentInliner =>
- val inlineSize = inliner.maxInlineSize.toBytes.toInt
- val chunkSize = inliner.chunkSize.toBytes.toInt
- Math.max(inlineSize, chunkSize) * 2
+ case inliner: AttachmentSupport[_] =>
+ inliner.maxInlineSize.toBytes.toInt * 2
case _ =>
42
}
diff --git
a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
index a32eb28..82b6b80 100644
---
a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
+++
b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
@@ -25,7 +25,8 @@ import akka.stream.IOResult
import akka.stream.scaladsl.{Sink, StreamConverters}
import akka.util.{ByteString, ByteStringBuilder}
import whisk.common.TransactionId
-import whisk.core.database.{AttachmentInliner, CacheChangeNotification,
NoDocumentException}
+import whisk.core.entity.size._
+import whisk.core.database.{AttachmentSupport, CacheChangeNotification,
NoDocumentException}
import whisk.core.entity.Attachments.{Attached, Attachment, Inline}
import whisk.core.entity.test.ExecHelpers
import whisk.core.entity.{CodeExec, DocInfo, EntityName, ExecManifest,
WhiskAction}
@@ -112,14 +113,14 @@ trait ArtifactStoreAttachmentBehaviors extends
ArtifactStoreBehaviorBase with Ex
getAttachmentBytes(i2, attached(action2)).futureValue.result() shouldBe
decode(code1)
}
- it should "put and read same attachment" in {
+ it should "put and read 5 MB attachment" in {
implicit val tid: TransactionId = transid()
- val size = nonInlinedAttachmentSize(entityStore)
+ val size = Math.max(nonInlinedAttachmentSize(entityStore),
5.MB.toBytes.toInt)
val base64 = encodedRandomBytes(size)
val exec = javaDefault(base64, Some("hello"))
val javaAction =
- WhiskAction(namespace, EntityName("attachment_unique"), exec)
+ WhiskAction(namespace, EntityName("attachment_large"), exec)
val i1 = WhiskAction.put(entityStore, javaAction, old = None).futureValue
val action2 = entityStore.get[WhiskAction](i1,
attachmentHandler).futureValue
@@ -161,7 +162,7 @@ trait ArtifactStoreAttachmentBehaviors extends
ArtifactStoreBehaviorBase with Ex
val a = attached(action2)
val attachmentUri = Uri(a.attachmentName)
- attachmentUri.scheme shouldBe AttachmentInliner.MemScheme
+ attachmentUri.scheme shouldBe AttachmentSupport.MemScheme
a.length shouldBe Some(attachmentSize)
a.digest should not be empty
}