This is an automated email from the ASF dual-hosted git repository. dubeejw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new ea11ad4 Use batcher in CouchDbRestStore. (#2835) ea11ad4 is described below commit ea11ad42523a02130d565d46c44ed0fbb1f1aaee Author: Christian Bickel <git...@cbickel.de> AuthorDate: Mon Oct 30 17:37:47 2017 +0100 Use batcher in CouchDbRestStore. (#2835) --- .../scala/whisk/core/database/ArtifactStore.scala | 11 --- .../core/database/ArtifactStoreExceptions.scala | 2 + .../core/database/ArtifactStoreProvider.scala | 8 +- .../whisk/core/database/CouchDbRestStore.scala | 92 ++++++++++++++-------- .../whisk/core/database/CouchDbStoreProvider.scala | 9 ++- .../whisk/core/database/DocumentFactory.scala | 8 -- .../src/main/scala/whisk/core/entity/DocInfo.scala | 30 ++++++- .../main/scala/whisk/core/entity/WhiskStore.scala | 11 +-- .../scala/whisk/core/invoker/InvokerReactive.scala | 24 +++--- .../core/cli/test/SequenceMigrationTests.scala | 3 +- .../core/controller/test/ActivationsApiTests.scala | 10 +-- .../whisk/core/entity/test/DatastoreTests.scala | 2 + .../scala/whisk/core/entity/test/ViewTests.scala | 3 + 13 files changed, 127 insertions(+), 86 deletions(-) diff --git a/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala b/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala index dbae8e5..2a61730 100644 --- a/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala +++ b/common/scala/src/main/scala/whisk/core/database/ArtifactStore.scala @@ -55,17 +55,6 @@ trait ArtifactStore[DocumentAbstraction] { protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo] /** - * Puts (saves) documents to database in bulk using a future. - * If the operation is successful, the future completes with DocId else an appropriate exception. - * - * @param ds the documents to put in the database - * @param transid the transaction id for logging - * @return a future that completes either with DocId - */ - protected[database] def put(ds: Seq[DocumentAbstraction])( - implicit transid: TransactionId): Future[Seq[Either[DocumentConflictException, DocInfo]]] - - /** * Deletes document from database using a future. * If the operation is successful, the future completes with true. * diff --git a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreExceptions.scala b/common/scala/src/main/scala/whisk/core/database/ArtifactStoreExceptions.scala index f949328..f8e7ec5 100644 --- a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreExceptions.scala +++ b/common/scala/src/main/scala/whisk/core/database/ArtifactStoreExceptions.scala @@ -26,3 +26,5 @@ case class DocumentConflictException(message: String) extends ArtifactStoreExcep case class DocumentTypeMismatchException(message: String) extends ArtifactStoreException(message) case class DocumentUnreadable(message: String) extends ArtifactStoreException(message) + +case class PutException(message: String) extends ArtifactStoreException(message) diff --git a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala b/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala index ba471aa..2080d47 100644 --- a/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala +++ b/common/scala/src/main/scala/whisk/core/database/ArtifactStoreProvider.scala @@ -18,6 +18,7 @@ package whisk.core.database import akka.actor.ActorSystem +import akka.stream.ActorMaterializer import spray.json.RootJsonFormat import whisk.common.Logging import whisk.core.WhiskConfig @@ -27,8 +28,11 @@ import whisk.spi.Spi * An Spi for providing ArtifactStore implementations */ trait ArtifactStoreProvider extends Spi { - def makeStore[D <: DocumentSerializer](config: WhiskConfig, name: WhiskConfig => String)( + def makeStore[D <: DocumentSerializer](config: WhiskConfig, + name: WhiskConfig => String, + useBatching: Boolean = false)( implicit jsonFormat: RootJsonFormat[D], actorSystem: ActorSystem, - logging: Logging): ArtifactStore[D] + logging: Logging, + materializer: ActorMaterializer): ArtifactStore[D] } diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala index 90ede7e..0e66aee 100644 --- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala +++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala @@ -20,22 +20,23 @@ package whisk.core.database import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration._ + import akka.actor.ActorSystem import akka.event.Logging.ErrorLevel import akka.http.scaladsl.model._ +import akka.stream.ActorMaterializer import akka.stream.scaladsl._ import akka.util.ByteString import spray.json._ import whisk.common.Logging import whisk.common.LoggingMarkers import whisk.common.TransactionId +import whisk.core.entity.BulkEntityResult import whisk.core.entity.DocInfo import whisk.core.entity.DocRevision import whisk.core.entity.WhiskDocument import whisk.http.Messages -import scala.util.{Failure, Success, Try} - /** * Basic client to put and delete artifacts in a data store. * @@ -47,13 +48,17 @@ import scala.util.{Failure, Success, Try} * @param dbName the name of the database to operate on * @param serializerEvidence confirms the document abstraction is serializable to a Document with an id */ -class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer]( - dbProtocol: String, - dbHost: String, - dbPort: Int, - dbUsername: String, - dbPassword: String, - dbName: String)(implicit system: ActorSystem, val logging: Logging, jsonFormat: RootJsonFormat[DocumentAbstraction]) +class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: String, + dbHost: String, + dbPort: Int, + dbUsername: String, + dbPassword: String, + dbName: String, + useBatching: Boolean = false)( + implicit system: ActorSystem, + val logging: Logging, + jsonFormat: RootJsonFormat[DocumentAbstraction], + materializer: ActorMaterializer) extends ArtifactStore[DocumentAbstraction] with DefaultJsonProtocol { @@ -62,6 +67,13 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer]( private val client: CouchDbRestClient = new CouchDbRestClient(dbProtocol, dbHost, dbPort.toInt, dbUsername, dbPassword, dbName) + // This the the amount of allowed parallel requests for each entity, before batching starts. If there are already maxOpenDbRequests + // and more documents need to be stored, then all arriving documents will be put into batches (if enabled) to avoid a long queue. + private val maxOpenDbRequests = system.settings.config.getInt("akka.http.host-connection-pool.max-open-requests") / 2 + + private val batcher: Batcher[JsObject, Either[ArtifactStoreException, DocInfo]] = + new Batcher(500, maxOpenDbRequests)(put(_)(TransactionId.unknown)) + override protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo] = { val asJson = d.toDocumentRecord @@ -70,29 +82,42 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer]( require(!id.isEmpty, "document id must be defined") val docinfoStr = s"id: $id, rev: ${rev.getOrElse("null")}" - val start = transid.started(this, LoggingMarkers.DATABASE_SAVE, s"[PUT] '$dbName' saving document: '${docinfoStr}'") - val request: CouchDbRestClient => Future[Either[StatusCode, JsObject]] = rev match { - case Some(r) => - client => - client.putDoc(id, r, asJson) - case None => - client => - client.putDoc(id, asJson) - } + val f = if (useBatching) { + batcher.put(asJson).map { e => + e match { + case Right(response) => + transid.finished(this, start, s"[PUT] '$dbName' completed document: '${docinfoStr}', response: '$response'") + response - val f = request(client).map { e => - e match { + case Left(e: DocumentConflictException) => + transid.finished(this, start, s"[PUT] '$dbName', document: '${docinfoStr}'; conflict.") + // For compatibility. + throw DocumentConflictException("conflict on 'put'") + + case Left(e: ArtifactStoreException) => + transid.finished(this, start, s"[PUT] '$dbName', document: '${docinfoStr}'; ${e.getMessage}.") + throw PutException("error on 'put'") + } + } + } else { + val request: CouchDbRestClient => Future[Either[StatusCode, JsObject]] = rev match { + case Some(r) => + client => + client.putDoc(id, r, asJson) + case None => + client => + client.putDoc(id, asJson) + } + request(client).map { case Right(response) => transid.finished(this, start, s"[PUT] '$dbName' completed document: '${docinfoStr}', response: '$response'") response.convertTo[DocInfo] - case Left(StatusCodes.Conflict) => transid.finished(this, start, s"[PUT] '$dbName', document: '${docinfoStr}'; conflict.") // For compatibility. throw DocumentConflictException("conflict on 'put'") - case Left(code) => transid.failed( this, @@ -109,22 +134,25 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer]( transid.failed(this, start, s"[PUT] '$dbName' internal error, failure: '${failure.getMessage}'", ErrorLevel)) } - override protected[database] def put(ds: Seq[DocumentAbstraction])( - implicit transid: TransactionId): Future[Seq[Either[DocumentConflictException, DocInfo]]] = { + private def put(ds: Seq[JsObject])( + implicit transid: TransactionId): Future[Seq[Either[ArtifactStoreException, DocInfo]]] = { val count = ds.size val start = transid.started(this, LoggingMarkers.DATABASE_BULK_SAVE, s"'$dbName' saving $count documents") - val request: Future[Either[StatusCode, JsArray]] = client.putDocs(ds.map(_.toDocumentRecord)) - - val f = request.map { + val f = client.putDocs(ds).map { _ match { case Right(response) => transid.finished(this, start, s"'$dbName' completed $count documents") - response.convertTo[Seq[JsValue]].map { result => - Try(result.convertTo[DocInfo]) match { - case Success(info) => Right(info) - case Failure(_) => Left(DocumentConflictException("conflict on 'bulk_put'")) - } + + response.convertTo[Seq[BulkEntityResult]].map { singleResult => + singleResult.error + .map { + case "conflict" => Left(DocumentConflictException("conflict on 'bulk_put'")) + case e => Left(PutException(s"Unexpected $e: ${singleResult.reason.getOrElse("")} on 'bulk_put'")) + } + .getOrElse { + Right(singleResult.toDocInfo) + } } case Left(code) => diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala index e3713d8..d2c08dd 100644 --- a/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala +++ b/common/scala/src/main/scala/whisk/core/database/CouchDbStoreProvider.scala @@ -18,16 +18,18 @@ package whisk.core.database import akka.actor.ActorSystem +import akka.stream.ActorMaterializer import spray.json.RootJsonFormat import whisk.common.Logging import whisk.core.WhiskConfig object CouchDbStoreProvider extends ArtifactStoreProvider { - def makeStore[D <: DocumentSerializer](config: WhiskConfig, name: WhiskConfig => String)( + def makeStore[D <: DocumentSerializer](config: WhiskConfig, name: WhiskConfig => String, useBatching: Boolean)( implicit jsonFormat: RootJsonFormat[D], actorSystem: ActorSystem, - logging: Logging): ArtifactStore[D] = { + logging: Logging, + materializer: ActorMaterializer): ArtifactStore[D] = { require(config != null && config.isValid, "config is undefined or not valid") require( config.dbProvider == "Cloudant" || config.dbProvider == "CouchDB", @@ -43,6 +45,7 @@ object CouchDbStoreProvider extends ArtifactStoreProvider { config.dbPort.toInt, config.dbUsername, config.dbPassword, - name(config)) + name(config), + useBatching) } } diff --git a/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala b/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala index 45d6d28..6aa9515 100644 --- a/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala +++ b/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala @@ -164,14 +164,6 @@ trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] { } } - def put[Wsuper >: W](db: ArtifactStore[Wsuper], docs: Seq[W])( - implicit transid: TransactionId): Future[Seq[Either[DocumentConflictException, DocInfo]]] = { - implicit val logger = db.logging - implicit val ec = db.executionContext - - db.put(docs) - } - def attach[Wsuper >: W]( db: ArtifactStore[Wsuper], doc: DocInfo, diff --git a/common/scala/src/main/scala/whisk/core/entity/DocInfo.scala b/common/scala/src/main/scala/whisk/core/entity/DocInfo.scala index d03d3c1..b6e635a 100644 --- a/common/scala/src/main/scala/whisk/core/entity/DocInfo.scala +++ b/common/scala/src/main/scala/whisk/core/entity/DocInfo.scala @@ -17,14 +17,16 @@ package whisk.core.entity -import whisk.core.entity.ArgNormalizer.trim import scala.util.Try -import spray.json.JsValue -import spray.json.RootJsonFormat + +import spray.json.DefaultJsonProtocol import spray.json.JsNull import spray.json.JsString +import spray.json.JsValue +import spray.json.RootJsonFormat import spray.json.deserializationError -import spray.json.DefaultJsonProtocol + +import whisk.core.entity.ArgNormalizer.trim /** * A DocId is the document id === primary key in the datastore. @@ -86,6 +88,22 @@ protected[core] case class DocInfo protected[entity] (id: DocId, rev: DocRevisio } } +/** + * A BulkEntityResult is wrapping the fields that are returned for a single document on a bulk-put of several documents. + * http://docs.couchdb.org/en/2.1.0/api/database/bulk-api.html#post--db-_bulk_docs + * + * @param id the document id + * @param rev the document revision, optional; this is an opaque value determined by the datastore + * @param error the error, that occured on trying to put this document into CouchDB + * @param reason the error message that correspands to the error + */ +case class BulkEntityResult(id: String, + rev: DocRevision = DocRevision.empty, + error: Option[String], + reason: Option[String]) { + def toDocInfo = DocInfo(DocId(id), rev) +} + protected[core] object DocId extends ArgNormalizer[DocId] { /** @@ -151,3 +169,7 @@ protected[core] object DocInfo extends DefaultJsonProtocol { implicit val serdes = jsonFormat2(DocInfo.apply) } + +object BulkEntityResult extends DefaultJsonProtocol { + implicit val serdes = jsonFormat4(BulkEntityResult.apply) +} diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala index e766d61..df8d1fe 100644 --- a/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala +++ b/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala @@ -24,6 +24,7 @@ import scala.language.postfixOps import scala.util.Try import akka.actor.ActorSystem +import akka.stream.ActorMaterializer import spray.json.JsObject import spray.json.JsString import spray.json.RootJsonFormat @@ -97,7 +98,7 @@ object WhiskAuthStore { dbPort -> null, dbAuths -> null) - def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging) = + def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging, materializer: ActorMaterializer) = SpiLoader.get[ArtifactStoreProvider].makeStore[WhiskAuth](config, _.dbAuths) } @@ -112,10 +113,10 @@ object WhiskEntityStore { dbPort -> null, dbWhisk -> null) - def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging) = + def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging, materializer: ActorMaterializer) = SpiLoader .get[ArtifactStoreProvider] - .makeStore[WhiskEntity](config, _.dbWhisk)(WhiskEntityJsonFormat, system, logging) + .makeStore[WhiskEntity](config, _.dbWhisk)(WhiskEntityJsonFormat, system, logging, materializer) } @@ -130,8 +131,8 @@ object WhiskActivationStore { dbPort -> null, dbActivations -> null) - def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging) = - SpiLoader.get[ArtifactStoreProvider].makeStore[WhiskActivation](config, _.dbActivations) + def datastore(config: WhiskConfig)(implicit system: ActorSystem, logging: Logging, materializer: ActorMaterializer) = + SpiLoader.get[ArtifactStoreProvider].makeStore[WhiskActivation](config, _.dbActivations, true) } /** diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala index 2308913..76b9c3e 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala @@ -22,8 +22,11 @@ import java.time.Instant import scala.concurrent.Future import scala.concurrent.duration._ -import scala.util.{Failure, Success} +import scala.util.Failure +import scala.util.Success + import org.apache.kafka.common.errors.RecordTooLargeException + import akka.actor.ActorRefFactory import akka.actor.ActorSystem import akka.actor.Props @@ -44,7 +47,7 @@ import whisk.core.containerpool.ContainerPool import whisk.core.containerpool.ContainerProxy import whisk.core.containerpool.PrewarmingConfig import whisk.core.containerpool.Run -import whisk.core.database.{Batcher, DocumentConflictException, NoDocumentException} +import whisk.core.database.NoDocumentException import whisk.core.entity._ import whisk.core.entity.size._ import whisk.http.Messages @@ -54,6 +57,7 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa implicit actorSystem: ActorSystem, logging: Logging) { + implicit val materializer: ActorMaterializer = ActorMaterializer() implicit val ec = actorSystem.dispatcher implicit val cfg = config @@ -123,23 +127,13 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa } } - implicit val materializer: ActorMaterializer = ActorMaterializer() - // In worst case we need "maximumContainers" connections to load actions from the database. The remaining - // connections can safely be used for writing activations. - val maxOpenDbRequests = actorSystem.settings.config.getInt("akka.http.host-connection-pool.max-open-requests") - val maxOpenActivationRequests = (maxOpenDbRequests - maximumContainers) max (maxOpenDbRequests / 2) - val batcher: Batcher[WhiskActivation, Either[DocumentConflictException, DocInfo]] = - new Batcher(500, maxOpenActivationRequests)(WhiskActivation.put(activationStore, _)(TransactionId.invoker)) - /** Stores an activation in the database. */ val store = (tid: TransactionId, activation: WhiskActivation) => { implicit val transid = tid logging.info(this, "recording the activation result to the data store") - - batcher.put(activation).andThen { - case Success(Right(_)) => logging.info(this, s"recorded activation") - case Success(Left(t)) => logging.error(this, s"failed to record activation, $t") - case Failure(t) => logging.error(this, s"failed to record activation, $t") + WhiskActivation.put(activationStore, activation)(tid, notifier = None).andThen { + case Success(id) => logging.info(this, s"recorded activation") + case Failure(t) => logging.error(this, s"failed to record activation") } } diff --git a/tests/src/test/scala/whisk/core/cli/test/SequenceMigrationTests.scala b/tests/src/test/scala/whisk/core/cli/test/SequenceMigrationTests.scala index b3fd9ce..977505f 100644 --- a/tests/src/test/scala/whisk/core/cli/test/SequenceMigrationTests.scala +++ b/tests/src/test/scala/whisk/core/cli/test/SequenceMigrationTests.scala @@ -26,6 +26,7 @@ import org.junit.runner.RunWith import org.scalatest.BeforeAndAfter import org.scalatest.junit.JUnitRunner +import akka.stream.ActorMaterializer import common.TestHelpers import common.TestUtils import common.Wsk @@ -33,7 +34,6 @@ import common.WskProps import common.WskTestHelpers import spray.json._ import spray.json.DefaultJsonProtocol.StringJsonFormat - import whisk.core.WhiskConfig import whisk.core.database.test.DbUtils import whisk.core.entity._ @@ -45,6 +45,7 @@ import whisk.core.entity.test.ExecHelpers @RunWith(classOf[JUnitRunner]) class SequenceMigrationTests extends TestHelpers with BeforeAndAfter with DbUtils with ExecHelpers with WskTestHelpers { + implicit val matzerializer = ActorMaterializer() implicit val wskprops = WskProps() val wsk = new Wsk val whiskConfig = new WhiskConfig(WhiskEntityStore.requiredProperties) diff --git a/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala index 2af35df..548c612 100644 --- a/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala +++ b/tests/src/test/scala/whisk/core/controller/test/ActivationsApiTests.scala @@ -19,16 +19,16 @@ package whisk.core.controller.test import java.time.Clock import java.time.Instant + import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner -import akka.http.scaladsl.model.StatusCodes._ import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ +import akka.http.scaladsl.model.StatusCodes._ import akka.http.scaladsl.server.Route - +import akka.stream.ActorMaterializer import spray.json._ import spray.json.DefaultJsonProtocol._ - import whisk.core.controller.WhiskActivationsApi import whisk.core.database.ArtifactStoreProvider import whisk.core.entity._ @@ -445,10 +445,10 @@ class ActivationsApiTests extends ControllerTestCommon with WhiskActivationsApi } it should "report proper error when record is corrupted on get" in { - + implicit val materializer = ActorMaterializer() val activationStore = SpiLoader .get[ArtifactStoreProvider] - .makeStore[WhiskEntity](whiskConfig, _.dbActivations)(WhiskEntityJsonFormat, system, logging) + .makeStore[WhiskEntity](whiskConfig, _.dbActivations)(WhiskEntityJsonFormat, system, logging, materializer) implicit val tid = transid() val entity = BadEntity(namespace, EntityName(ActivationId().toString)) put(activationStore, entity) diff --git a/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala b/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala index 3a289d7..f850f6b 100644 --- a/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala +++ b/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala @@ -28,6 +28,7 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner +import akka.stream.ActorMaterializer import common.StreamLogging import common.WskActorSystem import whisk.core.WhiskConfig @@ -47,6 +48,7 @@ class DatastoreTests with ExecHelpers with StreamLogging { + implicit val materializer = ActorMaterializer() val namespace = EntityPath("test namespace") val config = new WhiskConfig(WhiskAuthStore.requiredProperties ++ WhiskEntityStore.requiredProperties) val datastore = WhiskEntityStore.datastore(config) diff --git a/tests/src/test/scala/whisk/core/entity/test/ViewTests.scala b/tests/src/test/scala/whisk/core/entity/test/ViewTests.scala index 5be4ac4..2347170 100644 --- a/tests/src/test/scala/whisk/core/entity/test/ViewTests.scala +++ b/tests/src/test/scala/whisk/core/entity/test/ViewTests.scala @@ -30,6 +30,7 @@ import org.scalatest.FlatSpec import org.scalatest.Matchers import org.scalatest.junit.JUnitRunner +import akka.stream.ActorMaterializer import common.StreamLogging import common.WskActorSystem import whisk.core.WhiskConfig @@ -68,6 +69,8 @@ class ViewTests val creds2 = WhiskAuthHelpers.newAuth(Subject("t12345")) val namespace2 = EntityPath(creds2.subject.asString) + implicit val materializer = ActorMaterializer() + val config = new WhiskConfig(WhiskEntityStore.requiredProperties ++ WhiskActivationStore.requiredProperties) val entityStore = WhiskEntityStore.datastore(config) val activationStore = WhiskActivationStore.datastore(config) -- To stop receiving notification emails like this one, please contact ['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].