This is an automated email from the ASF dual-hosted git repository.

csantanapr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new b3a57dd  Provide Artifact with File Storage Activation Store (#3991)
b3a57dd is described below

commit b3a57ddf7409cd71ebb84b85143f0920995132f2
Author: James Dubee <jwdu...@us.ibm.com>
AuthorDate: Thu Jan 24 13:53:53 2019 -0500

    Provide Artifact with File Storage Activation Store (#3991)
---
 .../org/apache/openwhisk/core/WhiskConfig.scala    |   4 +
 .../core/database/ActivationFileStorage.scala      | 118 ++++++++++++++++
 .../ArtifactWithFileStorageActivationStore.scala   |  61 ++++++++
 .../openwhisk/core/entity/WhiskActivation.scala    |  14 +-
 .../apache/openwhisk/core/controller/Actions.scala |   2 +-
 .../openwhisk/core/controller/Activations.scala    |   4 +-
 .../core/controller/test/ActionsApiTests.scala     |   8 +-
 .../core/controller/test/ActivationsApiTests.scala |  12 +-
 .../controller/test/ControllerTestCommon.scala     |   8 +-
 ...tifactWithFileStorageActivationStoreTests.scala | 157 +++++++++++++++++++++
 10 files changed, 370 insertions(+), 18 deletions(-)

diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 8061b2a..32f5a90 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -243,4 +243,8 @@ object ConfigKeys {
 
   val controller = s"whisk.controller"
   val controllerActivation = s"$controller.activation"
+
+  val activationStore = "whisk.activation-store"
+  val activationStoreWithFileStorage = s"$activationStore.with-file-storage"
+
 }
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationFileStorage.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationFileStorage.scala
new file mode 100644
index 0000000..7fd7741
--- /dev/null
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationFileStorage.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database
+
+import java.nio.file.attribute.PosixFilePermission._
+import java.nio.file.{Files, Path}
+import java.time.Instant
+import java.util.EnumSet
+
+import akka.stream.ActorMaterializer
+import akka.stream.alpakka.file.scaladsl.LogRotatorSink
+import akka.stream.scaladsl.{Flow, MergeHub, RestartSink, Sink, Source}
+import akka.util.ByteString
+import org.apache.openwhisk.common.Logging
+import 
org.apache.openwhisk.core.containerpool.logging.ElasticSearchJsonProtocol._
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.entity.size._
+import spray.json._
+
+import scala.concurrent.duration._
+
+class ActivationFileStorage(logFilePrefix: String,
+                            logPath: Path,
+                            actorMaterializer: ActorMaterializer,
+                            logging: Logging) {
+
+  implicit val materializer = actorMaterializer
+
+  private var logFile = logPath
+  private val bufferSize = 100.MB
+  private val perms = EnumSet.of(OWNER_READ, OWNER_WRITE, GROUP_READ, 
GROUP_WRITE, OTHERS_READ, OTHERS_WRITE)
+  private val writeToFile: Sink[ByteString, _] = MergeHub
+    .source[ByteString]
+    .batchWeighted(bufferSize.toBytes, _.length, identity)(_ ++ _)
+    .to(RestartSink.withBackoff(minBackoff = 1.seconds, maxBackoff = 
60.seconds, randomFactor = 0.2) { () =>
+      LogRotatorSink(() => {
+        val maxSize = bufferSize.toBytes
+        var bytesRead = maxSize
+        element =>
+          {
+            val size = element.size
+
+            if (bytesRead + size > maxSize) {
+              logFile = 
logPath.resolve(s"$logFilePrefix-${Instant.now.toEpochMilli}.log")
+
+              logging.info(this, s"Rotating log file to '$logFile'")
+              createLogFile(logFile)
+              bytesRead = size
+              Some(logFile)
+            } else {
+              bytesRead += size
+              None
+            }
+          }
+      })
+    })
+    .run()
+
+  private def createLogFile(path: Path) =
+    try {
+      Files.createFile(path)
+      Files.setPosixFilePermissions(path, perms)
+    } catch {
+      case t: Throwable =>
+        logging.error(this, s"Couldn't create user log file '$t'")
+        throw t
+    }
+
+  private def transcribeLogs(activation: WhiskActivation, additionalFields: 
Map[String, JsValue]) =
+    activation.logs.logs.map { log =>
+      val line = JsObject(
+        Map("type" -> "user_log".toJson) ++ Map("message" -> log.toJson) ++ 
Map(
+          "activationId" -> activation.activationId.toJson) ++ 
additionalFields)
+
+      ByteString(s"${line.compactPrint}\n")
+    }
+
+  private def transcribeActivation(activation: WhiskActivation, 
additionalFields: Map[String, JsValue]) = {
+    val transactionType = Map("type" -> "activation_record".toJson)
+    val message = Map(
+      "message" -> s"Activation record '${activation.activationId}' for entity 
'${activation.name}'".toJson)
+    val annotations = activation.annotations.toJsObject.fields
+    val addFields = transactionType ++ annotations ++ message ++ 
additionalFields
+    val removeFields = Seq("logs", "annotations")
+    val line = activation.metadata.toExtendedJson(removeFields, addFields)
+
+    ByteString(s"${line.compactPrint}\n")
+  }
+
+  def getLogFile = logFile
+
+  def activationToFile(activation: WhiskActivation,
+                       context: UserContext,
+                       additionalFields: Map[String, JsValue] = Map.empty) = {
+    val transcribedLogs = transcribeLogs(activation, additionalFields)
+    val transcribedActivation = transcribeActivation(activation, 
additionalFields)
+
+    // Write each log line to file and then write the activation metadata
+    Source
+      .fromIterator(() => transcribedLogs.toIterator)
+      
.runWith(Flow[ByteString].concat(Source.single(transcribedActivation)).to(writeToFile))
+  }
+}
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactWithFileStorageActivationStore.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactWithFileStorageActivationStore.scala
new file mode 100644
index 0000000..597a1f9
--- /dev/null
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactWithFileStorageActivationStore.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database
+
+import java.nio.file.Paths
+
+import akka.actor.ActorSystem
+import akka.stream._
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.entity.{DocInfo, _}
+import pureconfig.loadConfigOrThrow
+import spray.json._
+
+import scala.concurrent.Future
+
+case class ArtifactWithFileStorageActivationStoreConfig(logFilePrefix: String, 
logPath: String, userIdField: String)
+
+class ArtifactWithFileStorageActivationStore(
+  actorSystem: ActorSystem,
+  actorMaterializer: ActorMaterializer,
+  logging: Logging,
+  config: ArtifactWithFileStorageActivationStoreConfig =
+    
loadConfigOrThrow[ArtifactWithFileStorageActivationStoreConfig](ConfigKeys.activationStoreWithFileStorage))
+    extends ArtifactActivationStore(actorSystem, actorMaterializer, logging) {
+
+  private val activationFileStorage =
+    new ActivationFileStorage(config.logFilePrefix, Paths.get(config.logPath), 
actorMaterializer, logging)
+
+  def getLogFile = activationFileStorage.getLogFile
+
+  override def store(activation: WhiskActivation, context: UserContext)(
+    implicit transid: TransactionId,
+    notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
+    val additionalFields = Map(config.userIdField -> 
context.user.namespace.uuid.toJson)
+
+    activationFileStorage.activationToFile(activation, context, 
additionalFields)
+    super.store(activation, context)
+  }
+
+}
+
+object ArtifactWithFileStorageActivationStoreProvider extends 
ActivationStoreProvider {
+  override def instance(actorSystem: ActorSystem, actorMaterializer: 
ActorMaterializer, logging: Logging) =
+    new ArtifactWithFileStorageActivationStore(actorSystem, actorMaterializer, 
logging)
+}
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskActivation.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskActivation.scala
index 59c892a..4222e51 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskActivation.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskActivation.scala
@@ -99,9 +99,10 @@ case class WhiskActivation(namespace: EntityPath,
 
   def resultAsJson = response.result.toJson.asJsObject
 
-  def toExtendedJson = {
+  def toExtendedJson(removeFields: Seq[String] = Seq.empty, addFields: 
Map[String, JsValue] = Map.empty) = {
     val JsObject(baseFields) = WhiskActivation.serdes.write(this).asJsObject
-    val newFields = (baseFields - "response") + ("response" -> 
response.toExtendedJson)
+
+    val newFields = (baseFields - "response") + ("response" -> 
response.toExtendedJson) -- removeFields ++ addFields
     if (end != Instant.EPOCH) {
       val durationValue = (duration getOrElse (end.toEpochMilli - 
start.toEpochMilli)).toJson
       JsObject(newFields + ("duration" -> durationValue))
@@ -110,10 +111,17 @@ case class WhiskActivation(namespace: EntityPath,
     }
   }
 
+  def metadata = {
+    copy(response = response.withoutResult, annotations = Parameters(), logs = 
ActivationLogs())
+      .revision[WhiskActivation](rev)
+  }
+  def withoutResult = {
+    copy(response = response.withoutResult)
+      .revision[WhiskActivation](rev)
+  }
   def withoutLogsOrResult = {
     copy(response = response.withoutResult, logs = 
ActivationLogs()).revision[WhiskActivation](rev)
   }
-
   def withoutLogs = copy(logs = 
ActivationLogs()).revision[WhiskActivation](rev)
   def withLogs(logs: ActivationLogs) = copy(logs = 
logs).revision[WhiskActivation](rev)
 }
diff --git 
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Actions.scala
 
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Actions.scala
index 3587a7e..d535fd1 100644
--- 
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Actions.scala
+++ 
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Actions.scala
@@ -263,7 +263,7 @@ trait WhiskActionsApi extends WhiskCollectionAPI with 
PostActionActivation with
           complete(Accepted, activationId.toJsObject)
         }
       case Success(Right(activation)) =>
-        val response = if (result) activation.resultAsJson else 
activation.toExtendedJson
+        val response = if (result) activation.resultAsJson else 
activation.toExtendedJson()
 
         respondWithActivationIdHeader(activation.activationId) {
           if (activation.response.isSuccess) {
diff --git 
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Activations.scala
 
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Activations.scala
index 5e8f0ab..9100ba4 100644
--- 
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Activations.scala
+++ 
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Activations.scala
@@ -172,7 +172,7 @@ trait WhiskActivationsApi extends Directives with 
AuthenticatedRouteProvider wit
           case None =>
             activationStore.listActivationsInNamespace(namespace, skip.n, 
limit.n, docs, since, upto, context)
         }
-        listEntities(activations map (_.fold((js) => js, (wa) => 
wa.map(_.toExtendedJson))))
+        listEntities(activations map (_.fold((js) => js, (wa) => 
wa.map(_.toExtendedJson()))))
       }
     }
   }
@@ -191,7 +191,7 @@ trait WhiskActivationsApi extends Directives with 
AuthenticatedRouteProvider wit
     pathEndOrSingleSlash {
       getEntity(
         activationStore.get(ActivationId(docid.asString), context),
-        postProcess = Some((activation: WhiskActivation) => 
complete(activation.toExtendedJson)))
+        postProcess = Some((activation: WhiskActivation) => 
complete(activation.toExtendedJson())))
     } ~ (pathPrefix(resultPath) & pathEnd) { fetchResponse(context, docid) } ~
       (pathPrefix(logsPath) & pathEnd) { fetchLogs(context, docid) }
   }
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
index b69536b..761a09b 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
@@ -1386,7 +1386,7 @@ class ActionsApiTests extends ControllerTestCommon with 
WhiskActionsApi {
         Post(s"$collectionPath/${action.name}?blocking=true") ~> 
Route.seal(routes(creds)) ~> check {
           status should be(OK)
           val response = responseAs[JsObject]
-          response should be(activation.withoutLogs.toExtendedJson)
+          response should be(activation.withoutLogs.toExtendedJson())
         }
 
         // repeat invoke, get only result back
@@ -1422,7 +1422,7 @@ class ActionsApiTests extends ControllerTestCommon with 
WhiskActionsApi {
       Post(s"$collectionPath/${action.name}?blocking=true") ~> 
Route.seal(routes(creds)) ~> check {
         status should be(OK)
         val response = responseAs[JsObject]
-        response should be(activation.withoutLogs.toExtendedJson)
+        response should be(activation.withoutLogs.toExtendedJson())
       }
 
       // repeat invoke, get only result back
@@ -1475,7 +1475,7 @@ class ActionsApiTests extends ControllerTestCommon with 
WhiskActionsApi {
       Post(s"$collectionPath/${action.name}?blocking=true&timeout=500") ~> 
Route.seal(routes(creds)) ~> check {
         status shouldBe OK
         val response = responseAs[JsObject]
-        response shouldBe activation.withoutLogs.toExtendedJson
+        response shouldBe activation.withoutLogs.toExtendedJson()
         headers should contain(RawHeader(ActivationIdHeader, 
response.fields("activationId").convertTo[String]))
       }
     } finally {
@@ -1504,7 +1504,7 @@ class ActionsApiTests extends ControllerTestCommon with 
WhiskActionsApi {
         Post(s"$collectionPath/${action.name}?blocking=true") ~> 
Route.seal(routes(creds)) ~> check {
           status should be(InternalServerError)
           val response = responseAs[JsObject]
-          response should be(activation.withoutLogs.toExtendedJson)
+          response should be(activation.withoutLogs.toExtendedJson())
           headers should contain(RawHeader(ActivationIdHeader, 
response.fields("activationId").convertTo[String]))
         }
       } finally {
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActivationsApiTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActivationsApiTests.scala
index f540c6e..8ad2e4b 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActivationsApiTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActivationsApiTests.scala
@@ -188,7 +188,7 @@ class ActivationsApiTests extends ControllerTestCommon with 
WhiskActivationsApi
           status should be(OK)
           val response = responseAs[List[JsObject]]
           activations.length should be(response.length)
-          response should contain theSameElementsAs 
activations.map(_.toExtendedJson)
+          response should contain theSameElementsAs 
activations.map(_.toExtendedJson())
         }
       }
     } finally {
@@ -270,7 +270,7 @@ class ActivationsApiTests extends ControllerTestCommon with 
WhiskActivationsApi
             status should be(OK)
             val response = responseAs[List[JsObject]]
             expected.length should be(response.length)
-            response should contain theSameElementsAs 
expected.map(_.toExtendedJson)
+            response should contain theSameElementsAs 
expected.map(_.toExtendedJson())
           }
         }
       }
@@ -286,7 +286,7 @@ class ActivationsApiTests extends ControllerTestCommon with 
WhiskActivationsApi
             status should be(OK)
             val response = responseAs[List[JsObject]]
             expected.length should be(response.length)
-            response should contain theSameElementsAs 
expected.map(_.toExtendedJson)
+            response should contain theSameElementsAs 
expected.map(_.toExtendedJson())
           }
         }
       }
@@ -302,7 +302,7 @@ class ActivationsApiTests extends ControllerTestCommon with 
WhiskActivationsApi
             status should be(OK)
             val response = responseAs[List[JsObject]]
             expected.length should be(response.length)
-            response should contain theSameElementsAs 
expected.map(_.toExtendedJson)
+            response should contain theSameElementsAs 
expected.map(_.toExtendedJson())
           }
         }
       }
@@ -538,14 +538,14 @@ class ActivationsApiTests extends ControllerTestCommon 
with WhiskActivationsApi
       Get(s"$collectionPath/${activation.activationId.asString}") ~> 
Route.seal(routes(creds)) ~> check {
         status should be(OK)
         val response = responseAs[JsObject]
-        response should be(activation.toExtendedJson)
+        response should be(activation.toExtendedJson())
       }
 
       // it should "get activation by name in explicit namespace owned by 
subject" in
       
Get(s"/$namespace/${collection.path}/${activation.activationId.asString}") ~> 
Route.seal(routes(creds)) ~> check {
         status should be(OK)
         val response = responseAs[JsObject]
-        response should be(activation.toExtendedJson)
+        response should be(activation.toExtendedJson())
       }
 
       // it should "reject get activation by name in explicit namespace not 
owned by subject" in
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
index d623b3d..9e58639 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
@@ -125,7 +125,9 @@ protected trait ControllerTestCommon
       () => {
         val activations: Future[Either[List[JsObject], List[WhiskActivation]]] 
=
           activationStore.listActivationsInNamespace(namespace, 0, 0, context 
= context)
-        val listFuture: Future[List[JsObject]] = activations map (_.fold((js) 
=> js, (wa) => wa.map(_.toExtendedJson)))
+        val listFuture: Future[List[JsObject]] = activations map (_.fold(
+          (js) => js,
+          (wa) => wa.map(_.toExtendedJson())))
 
         listFuture map { l =>
           if (l.length != count) {
@@ -146,7 +148,9 @@ protected trait ControllerTestCommon
       () => {
         val activations: Future[Either[List[JsObject], List[WhiskActivation]]] 
=
           activationStore.listActivationsMatchingName(namespace, name, 0, 0, 
context = context)
-        val listFuture: Future[List[JsObject]] = activations map (_.fold((js) 
=> js, (wa) => wa.map(_.toExtendedJson)))
+        val listFuture: Future[List[JsObject]] = activations map (_.fold(
+          (js) => js,
+          (wa) => wa.map(_.toExtendedJson())))
 
         listFuture map { l =>
           if (l.length != count) {
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/database/ArtifactWithFileStorageActivationStoreTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/database/ArtifactWithFileStorageActivationStoreTests.scala
new file mode 100644
index 0000000..9253844
--- /dev/null
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/database/ArtifactWithFileStorageActivationStoreTests.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database
+
+import java.io.File
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.HttpRequest
+import akka.stream.ActorMaterializer
+import akka.testkit.TestKit
+import common.StreamLogging
+import org.junit.runner.RunWith
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpecLike, Matchers}
+import spray.json.DefaultJsonProtocol._
+import spray.json._
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.entity.size.SizeInt
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.io.Source
+
+@RunWith(classOf[JUnitRunner])
+class ArtifactWithFileStorageActivationStoreTests()
+    extends TestKit(ActorSystem("ArtifactWithFileStorageActivationStoreTests"))
+    with FlatSpecLike
+    with Matchers
+    with ScalaFutures
+    with StreamLogging {
+
+  implicit val transid: TransactionId = TransactionId.testing
+  implicit val notifier: Option[CacheChangeNotification] = None
+
+  private val materializer = ActorMaterializer()
+  private val uuid = UUID()
+  private val subject = Subject()
+  private val user =
+    Identity(subject, Namespace(EntityName("testSpace"), uuid), 
BasicAuthenticationAuthKey(uuid, Secret()), Set())
+  private val context = UserContext(user, HttpRequest())
+
+  private def await[T](awaitable: Future[T], timeout: FiniteDuration = 
10.seconds) = Await.result(awaitable, timeout)
+
+  def responsePermutations = {
+    val message = JsObject("result key" -> JsString("result value"))
+    Seq(
+      ActivationResponse.success(None),
+      ActivationResponse.success(Some(message)),
+      ActivationResponse.applicationError(message),
+      ActivationResponse.whiskError(message))
+  }
+
+  def logPermutations = {
+    Seq(
+      ActivationLogs(),
+      ActivationLogs(Vector("2018-03-05T02:10:38.196689520Z stdout: single log 
line")),
+      ActivationLogs(
+        Vector(
+          "2018-03-05T02:10:38.196689522Z stdout: first log line of multiple 
lines",
+          "2018-03-05T02:10:38.196754258Z stdout: second log line of multiple 
lines")))
+  }
+
+  def expectedFileContent(activation: WhiskActivation) = {
+    val expectedLogs = activation.logs.logs.map { log =>
+      JsObject(
+        "type" -> "user_log".toJson,
+        "message" -> log.toJson,
+        "activationId" -> activation.activationId.toJson,
+        "namespaceId" -> user.namespace.uuid.toJson)
+    }
+    val expectedActivation = JsObject(
+      "type" -> "activation_record".toJson,
+      "duration" -> activation.duration.toJson,
+      "name" -> activation.name.toJson,
+      "subject" -> activation.subject.toJson,
+      "waitTime" -> activation.annotations.get("waitTime").toJson.toJson,
+      "activationId" -> activation.activationId.toJson,
+      "namespaceId" -> user.namespace.uuid.toJson,
+      "publish" -> activation.publish.toJson,
+      "version" -> activation.version.toJson,
+      "response" -> activation.response.withoutResult.toExtendedJson,
+      "end" -> activation.end.toEpochMilli.toJson,
+      "message" -> s"Activation record '${activation.activationId}' for entity 
'${activation.name}'".toJson,
+      "kind" -> activation.annotations.get("kind").toJson.toJson,
+      "start" -> activation.start.toEpochMilli.toJson,
+      "limits" -> activation.annotations.get("limits").toJson.toJson,
+      "initTime" -> activation.annotations.get("initTime").toJson,
+      "namespace" -> activation.namespace.toJson)
+
+    expectedLogs ++ Seq(expectedActivation)
+  }
+
+  it should "store activations in artifact store and to file" in {
+    val config = ArtifactWithFileStorageActivationStoreConfig("userlogs", 
"logs", "namespaceId")
+    val activationStore = new ArtifactWithFileStorageActivationStore(system, 
materializer, logging, config)
+    val logDir = new File(new File(".").getCanonicalPath, config.logPath)
+
+    try {
+      logDir.mkdir
+
+      val activations = responsePermutations.map { response =>
+        logPermutations.map { logs =>
+          val activation = WhiskActivation(
+            namespace = EntityPath(subject.asString),
+            name = EntityName("name"),
+            subject = subject,
+            activationId = ActivationId.generate(),
+            start = Instant.now,
+            end = Instant.now,
+            response = response,
+            logs = logs,
+            duration = Some(101L),
+            annotations = Parameters("kind", "nodejs:6") ++ Parameters(
+              "limits",
+              ActionLimits(TimeLimit(60.second), MemoryLimit(256.MB), 
LogLimit(10.MB)).toJson) ++
+              Parameters("waitTime", 16.toJson) ++
+              Parameters("initTime", 44.toJson))
+          val docInfo = await(activationStore.store(activation, context))
+          val fullyQualifiedActivationId = ActivationId(docInfo.id.asString)
+
+          await(activationStore.get(fullyQualifiedActivationId, context)) 
shouldBe activation
+          await(activationStore.delete(fullyQualifiedActivationId, context))
+          activation
+        }
+      }.flatten
+
+      Source
+        .fromFile(activationStore.getLogFile.toFile.getAbsoluteFile)
+        .getLines
+        .toList
+        .map(_.parseJson)
+        .toJson
+        .convertTo[JsArray] shouldBe 
activations.map(expectedFileContent).flatten.toJson.convertTo[JsArray]
+    } finally {
+      activationStore.getLogFile.toFile.getAbsoluteFile.delete
+      logDir.delete
+    }
+  }
+
+}

Reply via email to