This is an automated email from the ASF dual-hosted git repository. dubeejw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 33d70bf Avoid that actions do not stop after action timeout when logging heavily (#4299) 33d70bf is described below commit 33d70bfbb368e8c263e9f7a6fccfd03cf6655d58 Author: Steffen Rost <sr...@de.ibm.com> AuthorDate: Thu Feb 28 00:39:10 2019 +0000 Avoid that actions do not stop after action timeout when logging heavily (#4299) * wait for sentinel only in case no timeout occurred * introduce timeout annotation * nodejs:6 logging timeout action w/o busy loop * do conversion from FiniteDuration to Duration --- .../logging/DockerToActivationFileLogStore.scala | 5 +-- .../logging/DockerToActivationLogStore.scala | 39 ++++++++++++++++++++-- .../openwhisk/core/entity/WhiskActivation.scala | 17 ++++++---- .../core/containerpool/ContainerProxy.scala | 19 +++++++++-- tests/dat/actions/loggingTimeout.js | 36 ++++++++++++++++++++ tests/src/test/scala/common/TimingHelpers.scala | 6 ++-- .../openwhisk/core/limits/ActionLimitsTests.scala | 36 +++++++++++++++++++- 7 files changed, 140 insertions(+), 18 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala index 868b9cf..2c9c0a9 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala @@ -123,7 +123,8 @@ class DockerToActivationFileLogStore(system: ActorSystem, destinationDirectory: container: Container, action: ExecutableWhiskAction): Future[ActivationLogs] = { - val logs = container.logs(action.limits.logs.asMegaBytes, action.exec.sentinelledLogs)(transid) + val isTimedoutActivation = activation.isTimedoutActivation + val logs = logStream(transid, container, action, isTimedoutActivation) // Adding the userId field to every written record, so any background process can properly correlate. val userIdField = Map("namespaceId" -> user.namespace.uuid.toJson) @@ -150,7 +151,7 @@ class DockerToActivationFileLogStore(system: ActorSystem, destinationDirectory: logs.runWith(combined)._1.flatMap { seq => val possibleErrors = Set(Messages.logFailure, Messages.truncateLogs(action.limits.logs.asMegaBytes)) - val errored = seq.lastOption.exists(last => possibleErrors.exists(last.contains)) + val errored = isTimedoutActivation || seq.lastOption.exists(last => possibleErrors.exists(last.contains)) val logs = ActivationLogs(seq.toVector) if (!errored) { Future.successful(logs) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala index 7fa1ac6..4173a95 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala @@ -17,11 +17,13 @@ package org.apache.openwhisk.core.containerpool.logging +import java.time.Instant import akka.NotUsed import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.Source import akka.util.ByteString import org.apache.openwhisk.common.TransactionId @@ -70,19 +72,50 @@ class DockerToActivationLogStore(system: ActorSystem) extends LogStore { override def fetchLogs(activation: WhiskActivation, context: UserContext): Future[ActivationLogs] = Future.successful(activation.logs) + /** + * Obtains the container's stdout and stderr output. + * + * In case of a timed out activation do not wait for a sentinel to appear but instead + * collect the log as is and add a message to the log that data might be missing + * + * @param transid transaction id + * @param container container to obtain the log from + * @param action action that defines the log limit + * @param isTimedoutActivation is activation timed out + * + * @return a vector of Strings with log lines in our own JSON format + */ + protected def logStream(transid: TransactionId, + container: Container, + action: ExecutableWhiskAction, + isTimedoutActivation: Boolean): Source[ByteString, Any] = { + + // wait for a sentinel only if no container (developer) error occurred to avoid + // that log collection continues if the action code still logs after timeout + val sentinel = action.exec.sentinelledLogs && !isTimedoutActivation + val logs = container.logs(action.limits.logs.asMegaBytes, sentinel)(transid) + val logsWithPossibleError = if (isTimedoutActivation) { + logs.concat( + Source.single(ByteString(LogLine(Instant.now.toString, "stderr", Messages.logFailure).toJson.compactPrint))) + } else logs + logsWithPossibleError + } + override def collectLogs(transid: TransactionId, user: Identity, activation: WhiskActivation, container: Container, action: ExecutableWhiskAction): Future[ActivationLogs] = { - container - .logs(action.limits.logs.asMegaBytes, action.exec.sentinelledLogs)(transid) + val isTimedoutActivation = activation.isTimedoutActivation + val logs = logStream(transid, container, action, isTimedoutActivation) + + logs .via(DockerToActivationLogStore.toFormattedString) .runWith(Sink.seq) .flatMap { seq => val possibleErrors = Set(Messages.logFailure, Messages.truncateLogs(action.limits.logs.asMegaBytes)) - val errored = seq.lastOption.exists(last => possibleErrors.exists(last.contains)) + val errored = isTimedoutActivation || seq.lastOption.exists(last => possibleErrors.exists(last.contains)) val logs = ActivationLogs(seq.toVector) if (!errored) { Future.successful(logs) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskActivation.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskActivation.scala index 4222e51..5664c42 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskActivation.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskActivation.scala @@ -111,19 +111,23 @@ case class WhiskActivation(namespace: EntityPath, } } - def metadata = { + def metadata = copy(response = response.withoutResult, annotations = Parameters(), logs = ActivationLogs()) .revision[WhiskActivation](rev) - } - def withoutResult = { + + def withoutResult = copy(response = response.withoutResult) .revision[WhiskActivation](rev) - } - def withoutLogsOrResult = { + + def withoutLogsOrResult = copy(response = response.withoutResult, logs = ActivationLogs()).revision[WhiskActivation](rev) - } + def withoutLogs = copy(logs = ActivationLogs()).revision[WhiskActivation](rev) + def withLogs(logs: ActivationLogs) = copy(logs = logs).revision[WhiskActivation](rev) + + def isTimedoutActivation = annotations.getAs[Boolean](WhiskActivation.timeoutAnnotation).getOrElse(false) + } object WhiskActivation @@ -140,6 +144,7 @@ object WhiskActivation val initTimeAnnotation = "initTime" val waitTimeAnnotation = "waitTime" val conductorAnnotation = "conductor" + val timeoutAnnotation = "timeout" /** Some field names for compositions */ val actionField = "action" diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala index 7f45cc2..7419bf3 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala @@ -285,7 +285,7 @@ class ContainerProxy( // also update the feed and active ack; the container cleanup is queued // implicitly via a FailureMessage which will be processed later when the state // transitions to Running - val activation = ContainerProxy.constructWhiskActivation(job, None, Interval.zero, response) + val activation = ContainerProxy.constructWhiskActivation(job, None, Interval.zero, false, response) sendActiveAck( transid, activation, @@ -581,12 +581,22 @@ class ContainerProxy( val initRunInterval = initInterval .map(i => Interval(runInterval.start.minusMillis(i.duration.toMillis), runInterval.end)) .getOrElse(runInterval) - ContainerProxy.constructWhiskActivation(job, initInterval, initRunInterval, response) + ContainerProxy.constructWhiskActivation( + job, + initInterval, + initRunInterval, + runInterval.duration >= actionTimeout, + response) } } .recover { case InitializationError(interval, response) => - ContainerProxy.constructWhiskActivation(job, Some(interval), interval, response) + ContainerProxy.constructWhiskActivation( + job, + Some(interval), + interval, + interval.duration >= actionTimeout, + response) case t => // Actually, this should never happen - but we want to make sure to not miss a problem logging.error(this, s"caught unexpected error while running activation: ${t}") @@ -594,6 +604,7 @@ class ContainerProxy( job, None, Interval.zero, + false, ActivationResponse.whiskError(Messages.abnormalRun)) } @@ -708,6 +719,7 @@ object ContainerProxy { def constructWhiskActivation(job: Run, initInterval: Option[Interval], totalInterval: Interval, + isTimeout: Boolean, response: ActivationResponse) = { val causedBy = Some { if (job.msg.causedBySequence) { @@ -745,6 +757,7 @@ object ContainerProxy { Parameters(WhiskActivation.limitsAnnotation, job.action.limits.toJson) ++ Parameters(WhiskActivation.pathAnnotation, JsString(job.action.fullyQualifiedName(false).asString)) ++ Parameters(WhiskActivation.kindAnnotation, JsString(job.action.exec.kind)) ++ + Parameters(WhiskActivation.timeoutAnnotation, JsBoolean(isTimeout)) ++ causedBy ++ initTime }) } diff --git a/tests/dat/actions/loggingTimeout.js b/tests/dat/actions/loggingTimeout.js new file mode 100644 index 0000000..a386de0 --- /dev/null +++ b/tests/dat/actions/loggingTimeout.js @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more contributor +// license agreements; and to You under the Apache License, Version 2.0. + +// This action prints log lines for a defined duration. +// The output is throttled by a defined delay between two log lines +// in order to keep the log size small and to stay within the log size limit. + +function getArg(value, defaultValue) { + return value ? value : defaultValue; +} + +// input: { duration: <duration in millis>, delay: <delay in millis> }, e.g. +// main( { delay: 100, duration: 10000 } ); +function main(args) { + + durationMillis = getArg(args.duration, 120000); + delayMillis = getArg(args.delay, 100); + + logLines = 0; + startMillis = new Date(); + + timeout = setInterval(function() { + console.log(`[${ ++logLines }] The quick brown fox jumps over the lazy dog.`); + }, delayMillis); + + return new Promise(function(resolve, reject) { + setTimeout(function() { + clearInterval(timeout); + message = `hello, I'm back after ${new Date() - startMillis} ms and printed ${logLines} log lines` + console.log(message) + resolve({ message: message }); + }, durationMillis); + }); + +} + diff --git a/tests/src/test/scala/common/TimingHelpers.scala b/tests/src/test/scala/common/TimingHelpers.scala index d739222..8d3b2b8 100644 --- a/tests/src/test/scala/common/TimingHelpers.scala +++ b/tests/src/test/scala/common/TimingHelpers.scala @@ -21,10 +21,10 @@ import java.time.Instant import scala.concurrent.duration._ trait TimingHelpers { - def between(start: Instant, end: Instant): Duration = - Duration.fromNanos(java.time.Duration.between(start, end).toNanos) + def between(start: Instant, end: Instant): FiniteDuration = + FiniteDuration(java.time.Duration.between(start, end).toNanos, NANOSECONDS) - def durationOf[A](block: => A): (Duration, A) = { + def durationOf[A](block: => A): (FiniteDuration, A) = { val start = Instant.now val value = block val end = Instant.now diff --git a/tests/src/test/scala/org/apache/openwhisk/core/limits/ActionLimitsTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/limits/ActionLimitsTests.scala index 23523c0..c70a8b1 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/limits/ActionLimitsTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/limits/ActionLimitsTests.scala @@ -22,6 +22,7 @@ import akka.http.scaladsl.model.StatusCodes.BadGateway import java.io.File import java.io.PrintWriter import java.time.Instant + import scala.concurrent.duration.{Duration, DurationInt} import scala.language.postfixOps import org.junit.runner.RunWith @@ -30,6 +31,7 @@ import common.ActivationResult import common.TestHelpers import common.TestUtils import common.TestUtils.{BAD_REQUEST, DONTCARE_EXIT, SUCCESS_EXIT} +import common.TimingHelpers import common.WhiskProperties import common.rest.WskRestOperations import common.WskProps @@ -51,7 +53,7 @@ import org.apache.openwhisk.core.entity.size._ import org.apache.openwhisk.http.Messages @RunWith(classOf[JUnitRunner]) -class ActionLimitsTests extends TestHelpers with WskTestHelpers with WskActorSystem { +class ActionLimitsTests extends TestHelpers with WskTestHelpers with WskActorSystem with TimingHelpers { implicit val wskprops = WskProps() val wsk = new WskRestOperations @@ -463,4 +465,36 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers with WskActorSys _.response.result.get.fields("error") shouldBe Messages.memoryExhausted.toJson } } + + /** + * Test that a heavy logging action is interrupted within its timeout limits. + */ + it should "interrupt the heavy logging action within its time limits" in withAssetCleaner(wskprops) { + (wp, assetHelper) => + val name = s"NodeJsTestLoggingActionCausingTimeout-${System.currentTimeMillis()}" + assetHelper.withCleaner(wsk.action, name, confirmDelete = true) { (action, _) => + action.create( + name, + Some(TestUtils.getTestActionFilename("loggingTimeout.js")), + timeout = Some(allowedActionDuration)) + } + val duration = allowedActionDuration + 3.minutes + val checkDuration = allowedActionDuration + 1.minutes + val run = + wsk.action.invoke(name, Map("durationMillis" -> duration.toMillis.toJson, "delayMillis" -> 100.toJson)) + withActivation(wsk.activation, run) { result => + withClue("Activation result not as expected:") { + result.response.status shouldBe ActivationResponse.messageForCode(ActivationResponse.DeveloperError) + result.response.result.get + .fields("error") shouldBe Messages.timedoutActivation(allowedActionDuration, init = false).toJson + val logs = result.logs.get + logs.last should include(Messages.logFailure) + + val parseLogTime = (line: String) => Instant.parse(line.split(' ').head) + val startTime = parseLogTime(logs.head) + val endTime = parseLogTime(logs.last) + between(startTime, endTime).toMillis should be < checkDuration.toMillis + } + } + } }