This is an automated email from the ASF dual-hosted git repository.

dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 33d70bf  Avoid that actions do not stop after action timeout when 
logging heavily (#4299)
33d70bf is described below

commit 33d70bfbb368e8c263e9f7a6fccfd03cf6655d58
Author: Steffen Rost <sr...@de.ibm.com>
AuthorDate: Thu Feb 28 00:39:10 2019 +0000

    Avoid that actions do not stop after action timeout when logging heavily 
(#4299)
    
    * wait for sentinel only in case no timeout occurred
    
    * introduce timeout annotation
    
    * nodejs:6 logging timeout action w/o busy loop
    
    * do conversion from FiniteDuration to Duration
---
 .../logging/DockerToActivationFileLogStore.scala   |  5 +--
 .../logging/DockerToActivationLogStore.scala       | 39 ++++++++++++++++++++--
 .../openwhisk/core/entity/WhiskActivation.scala    | 17 ++++++----
 .../core/containerpool/ContainerProxy.scala        | 19 +++++++++--
 tests/dat/actions/loggingTimeout.js                | 36 ++++++++++++++++++++
 tests/src/test/scala/common/TimingHelpers.scala    |  6 ++--
 .../openwhisk/core/limits/ActionLimitsTests.scala  | 36 +++++++++++++++++++-
 7 files changed, 140 insertions(+), 18 deletions(-)

diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
index 868b9cf..2c9c0a9 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
@@ -123,7 +123,8 @@ class DockerToActivationFileLogStore(system: ActorSystem, 
destinationDirectory:
                            container: Container,
                            action: ExecutableWhiskAction): 
Future[ActivationLogs] = {
 
-    val logs = container.logs(action.limits.logs.asMegaBytes, 
action.exec.sentinelledLogs)(transid)
+    val isTimedoutActivation = activation.isTimedoutActivation
+    val logs = logStream(transid, container, action, isTimedoutActivation)
 
     // Adding the userId field to every written record, so any background 
process can properly correlate.
     val userIdField = Map("namespaceId" -> user.namespace.uuid.toJson)
@@ -150,7 +151,7 @@ class DockerToActivationFileLogStore(system: ActorSystem, 
destinationDirectory:
 
     logs.runWith(combined)._1.flatMap { seq =>
       val possibleErrors = Set(Messages.logFailure, 
Messages.truncateLogs(action.limits.logs.asMegaBytes))
-      val errored = seq.lastOption.exists(last => 
possibleErrors.exists(last.contains))
+      val errored = isTimedoutActivation || seq.lastOption.exists(last => 
possibleErrors.exists(last.contains))
       val logs = ActivationLogs(seq.toVector)
       if (!errored) {
         Future.successful(logs)
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala
index 7fa1ac6..4173a95 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala
@@ -17,11 +17,13 @@
 
 package org.apache.openwhisk.core.containerpool.logging
 
+import java.time.Instant
 import akka.NotUsed
 import akka.actor.ActorSystem
 import akka.stream.ActorMaterializer
 import akka.stream.scaladsl.Sink
 import akka.stream.scaladsl.Flow
+import akka.stream.scaladsl.Source
 import akka.util.ByteString
 
 import org.apache.openwhisk.common.TransactionId
@@ -70,19 +72,50 @@ class DockerToActivationLogStore(system: ActorSystem) 
extends LogStore {
   override def fetchLogs(activation: WhiskActivation, context: UserContext): 
Future[ActivationLogs] =
     Future.successful(activation.logs)
 
+  /**
+   * Obtains the container's stdout and stderr output.
+   *
+   * In case of a timed out activation do not wait for a sentinel to appear 
but instead
+   * collect the log as is and add a message to the log that data might be 
missing
+   *
+   * @param transid transaction id
+   * @param container container to obtain the log from
+   * @param action action that defines the log limit
+   * @param isTimedoutActivation is activation timed out
+   *
+   * @return a vector of Strings with log lines in our own JSON format
+   */
+  protected def logStream(transid: TransactionId,
+                          container: Container,
+                          action: ExecutableWhiskAction,
+                          isTimedoutActivation: Boolean): Source[ByteString, 
Any] = {
+
+    // wait for a sentinel only if no container (developer) error occurred to 
avoid
+    // that log collection continues if the action code still logs after 
timeout
+    val sentinel = action.exec.sentinelledLogs && !isTimedoutActivation
+    val logs = container.logs(action.limits.logs.asMegaBytes, 
sentinel)(transid)
+    val logsWithPossibleError = if (isTimedoutActivation) {
+      logs.concat(
+        Source.single(ByteString(LogLine(Instant.now.toString, "stderr", 
Messages.logFailure).toJson.compactPrint)))
+    } else logs
+    logsWithPossibleError
+  }
+
   override def collectLogs(transid: TransactionId,
                            user: Identity,
                            activation: WhiskActivation,
                            container: Container,
                            action: ExecutableWhiskAction): 
Future[ActivationLogs] = {
 
-    container
-      .logs(action.limits.logs.asMegaBytes, 
action.exec.sentinelledLogs)(transid)
+    val isTimedoutActivation = activation.isTimedoutActivation
+    val logs = logStream(transid, container, action, isTimedoutActivation)
+
+    logs
       .via(DockerToActivationLogStore.toFormattedString)
       .runWith(Sink.seq)
       .flatMap { seq =>
         val possibleErrors = Set(Messages.logFailure, 
Messages.truncateLogs(action.limits.logs.asMegaBytes))
-        val errored = seq.lastOption.exists(last => 
possibleErrors.exists(last.contains))
+        val errored = isTimedoutActivation || seq.lastOption.exists(last => 
possibleErrors.exists(last.contains))
         val logs = ActivationLogs(seq.toVector)
         if (!errored) {
           Future.successful(logs)
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskActivation.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskActivation.scala
index 4222e51..5664c42 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskActivation.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskActivation.scala
@@ -111,19 +111,23 @@ case class WhiskActivation(namespace: EntityPath,
     }
   }
 
-  def metadata = {
+  def metadata =
     copy(response = response.withoutResult, annotations = Parameters(), logs = 
ActivationLogs())
       .revision[WhiskActivation](rev)
-  }
-  def withoutResult = {
+
+  def withoutResult =
     copy(response = response.withoutResult)
       .revision[WhiskActivation](rev)
-  }
-  def withoutLogsOrResult = {
+
+  def withoutLogsOrResult =
     copy(response = response.withoutResult, logs = 
ActivationLogs()).revision[WhiskActivation](rev)
-  }
+
   def withoutLogs = copy(logs = 
ActivationLogs()).revision[WhiskActivation](rev)
+
   def withLogs(logs: ActivationLogs) = copy(logs = 
logs).revision[WhiskActivation](rev)
+
+  def isTimedoutActivation = 
annotations.getAs[Boolean](WhiskActivation.timeoutAnnotation).getOrElse(false)
+
 }
 
 object WhiskActivation
@@ -140,6 +144,7 @@ object WhiskActivation
   val initTimeAnnotation = "initTime"
   val waitTimeAnnotation = "waitTime"
   val conductorAnnotation = "conductor"
+  val timeoutAnnotation = "timeout"
 
   /** Some field names for compositions */
   val actionField = "action"
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index 7f45cc2..7419bf3 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -285,7 +285,7 @@ class ContainerProxy(
             // also update the feed and active ack; the container cleanup is 
queued
             // implicitly via a FailureMessage which will be processed later 
when the state
             // transitions to Running
-            val activation = ContainerProxy.constructWhiskActivation(job, 
None, Interval.zero, response)
+            val activation = ContainerProxy.constructWhiskActivation(job, 
None, Interval.zero, false, response)
             sendActiveAck(
               transid,
               activation,
@@ -581,12 +581,22 @@ class ContainerProxy(
               val initRunInterval = initInterval
                 .map(i => 
Interval(runInterval.start.minusMillis(i.duration.toMillis), runInterval.end))
                 .getOrElse(runInterval)
-              ContainerProxy.constructWhiskActivation(job, initInterval, 
initRunInterval, response)
+              ContainerProxy.constructWhiskActivation(
+                job,
+                initInterval,
+                initRunInterval,
+                runInterval.duration >= actionTimeout,
+                response)
           }
       }
       .recover {
         case InitializationError(interval, response) =>
-          ContainerProxy.constructWhiskActivation(job, Some(interval), 
interval, response)
+          ContainerProxy.constructWhiskActivation(
+            job,
+            Some(interval),
+            interval,
+            interval.duration >= actionTimeout,
+            response)
         case t =>
           // Actually, this should never happen - but we want to make sure to 
not miss a problem
           logging.error(this, s"caught unexpected error while running 
activation: ${t}")
@@ -594,6 +604,7 @@ class ContainerProxy(
             job,
             None,
             Interval.zero,
+            false,
             ActivationResponse.whiskError(Messages.abnormalRun))
       }
 
@@ -708,6 +719,7 @@ object ContainerProxy {
   def constructWhiskActivation(job: Run,
                                initInterval: Option[Interval],
                                totalInterval: Interval,
+                               isTimeout: Boolean,
                                response: ActivationResponse) = {
     val causedBy = Some {
       if (job.msg.causedBySequence) {
@@ -745,6 +757,7 @@ object ContainerProxy {
         Parameters(WhiskActivation.limitsAnnotation, job.action.limits.toJson) 
++
           Parameters(WhiskActivation.pathAnnotation, 
JsString(job.action.fullyQualifiedName(false).asString)) ++
           Parameters(WhiskActivation.kindAnnotation, 
JsString(job.action.exec.kind)) ++
+          Parameters(WhiskActivation.timeoutAnnotation, JsBoolean(isTimeout)) 
++
           causedBy ++ initTime
       })
   }
diff --git a/tests/dat/actions/loggingTimeout.js 
b/tests/dat/actions/loggingTimeout.js
new file mode 100644
index 0000000..a386de0
--- /dev/null
+++ b/tests/dat/actions/loggingTimeout.js
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more 
contributor
+// license agreements; and to You under the Apache License, Version 2.0.
+
+// This action prints log lines for a defined duration.
+// The output is throttled by a defined delay between two log lines
+// in order to keep the log size small and to stay within the log size limit.
+
+function getArg(value, defaultValue) {
+   return value ? value : defaultValue;
+}
+
+// input: { duration: <duration in millis>, delay: <delay in millis> }, e.g.
+// main( { delay: 100, duration: 10000 } );
+function main(args) {
+
+   durationMillis = getArg(args.duration, 120000);
+   delayMillis = getArg(args.delay, 100);
+
+   logLines = 0;
+   startMillis = new Date();
+
+   timeout = setInterval(function() {
+      console.log(`[${ ++logLines }] The quick brown fox jumps over the lazy 
dog.`);
+   }, delayMillis);
+
+   return new Promise(function(resolve, reject) {
+      setTimeout(function() {
+         clearInterval(timeout);
+         message = `hello, I'm back after ${new Date() - startMillis} ms and 
printed ${logLines} log lines`
+         console.log(message)
+         resolve({ message: message });
+      }, durationMillis);
+   });
+
+}
+
diff --git a/tests/src/test/scala/common/TimingHelpers.scala 
b/tests/src/test/scala/common/TimingHelpers.scala
index d739222..8d3b2b8 100644
--- a/tests/src/test/scala/common/TimingHelpers.scala
+++ b/tests/src/test/scala/common/TimingHelpers.scala
@@ -21,10 +21,10 @@ import java.time.Instant
 import scala.concurrent.duration._
 
 trait TimingHelpers {
-  def between(start: Instant, end: Instant): Duration =
-    Duration.fromNanos(java.time.Duration.between(start, end).toNanos)
+  def between(start: Instant, end: Instant): FiniteDuration =
+    FiniteDuration(java.time.Duration.between(start, end).toNanos, NANOSECONDS)
 
-  def durationOf[A](block: => A): (Duration, A) = {
+  def durationOf[A](block: => A): (FiniteDuration, A) = {
     val start = Instant.now
     val value = block
     val end = Instant.now
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/limits/ActionLimitsTests.scala 
b/tests/src/test/scala/org/apache/openwhisk/core/limits/ActionLimitsTests.scala
index 23523c0..c70a8b1 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/limits/ActionLimitsTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/limits/ActionLimitsTests.scala
@@ -22,6 +22,7 @@ import akka.http.scaladsl.model.StatusCodes.BadGateway
 import java.io.File
 import java.io.PrintWriter
 import java.time.Instant
+
 import scala.concurrent.duration.{Duration, DurationInt}
 import scala.language.postfixOps
 import org.junit.runner.RunWith
@@ -30,6 +31,7 @@ import common.ActivationResult
 import common.TestHelpers
 import common.TestUtils
 import common.TestUtils.{BAD_REQUEST, DONTCARE_EXIT, SUCCESS_EXIT}
+import common.TimingHelpers
 import common.WhiskProperties
 import common.rest.WskRestOperations
 import common.WskProps
@@ -51,7 +53,7 @@ import org.apache.openwhisk.core.entity.size._
 import org.apache.openwhisk.http.Messages
 
 @RunWith(classOf[JUnitRunner])
-class ActionLimitsTests extends TestHelpers with WskTestHelpers with 
WskActorSystem {
+class ActionLimitsTests extends TestHelpers with WskTestHelpers with 
WskActorSystem with TimingHelpers {
 
   implicit val wskprops = WskProps()
   val wsk = new WskRestOperations
@@ -463,4 +465,36 @@ class ActionLimitsTests extends TestHelpers with 
WskTestHelpers with WskActorSys
       _.response.result.get.fields("error") shouldBe 
Messages.memoryExhausted.toJson
     }
   }
+
+  /**
+   * Test that a heavy logging action is interrupted within its timeout limits.
+   */
+  it should "interrupt the heavy logging action within its time limits" in 
withAssetCleaner(wskprops) {
+    (wp, assetHelper) =>
+      val name = 
s"NodeJsTestLoggingActionCausingTimeout-${System.currentTimeMillis()}"
+      assetHelper.withCleaner(wsk.action, name, confirmDelete = true) { 
(action, _) =>
+        action.create(
+          name,
+          Some(TestUtils.getTestActionFilename("loggingTimeout.js")),
+          timeout = Some(allowedActionDuration))
+      }
+      val duration = allowedActionDuration + 3.minutes
+      val checkDuration = allowedActionDuration + 1.minutes
+      val run =
+        wsk.action.invoke(name, Map("durationMillis" -> 
duration.toMillis.toJson, "delayMillis" -> 100.toJson))
+      withActivation(wsk.activation, run) { result =>
+        withClue("Activation result not as expected:") {
+          result.response.status shouldBe 
ActivationResponse.messageForCode(ActivationResponse.DeveloperError)
+          result.response.result.get
+            .fields("error") shouldBe 
Messages.timedoutActivation(allowedActionDuration, init = false).toJson
+          val logs = result.logs.get
+          logs.last should include(Messages.logFailure)
+
+          val parseLogTime = (line: String) => Instant.parse(line.split(' 
').head)
+          val startTime = parseLogTime(logs.head)
+          val endTime = parseLogTime(logs.last)
+          between(startTime, endTime).toMillis should be < 
checkDuration.toMillis
+        }
+      }
+  }
 }

Reply via email to