This is an automated email from the ASF dual-hosted git repository. chetanm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 996d001 Diagnostic info and metrics for Docker command failures and timeouts (#4070) 996d001 is described below commit 996d00190553a42b451167200885521ee0dc491b Author: Sven Lange-Last <sven.lange-l...@de.ibm.com> AuthorDate: Fri Oct 26 08:33:19 2018 +0200 Diagnostic info and metrics for Docker command failures and timeouts (#4070) This change improves diagnostic information for failing Docker commands as well as timed out Docker commands: * For all failures (including timeouts), a textual representation for exit status values is logged. * Timeouts are explicitly detected and reported using a specialized exception allowing for a better timeout handling on higher layers of the Docker container implementation. * Emit counter metric on Docker command timeout This change introduces a set of new counter metrics that are emitted if a Docker command is terminated because of a timeout. A high number of such timeout occurrences is usually an indication for highly loaded invokers. The new metrics help to identify such invokers. --- .../src/main/scala/whisk/common/Logging.scala | 3 + .../core/containerpool/docker/DockerClient.scala | 11 +-- .../core/containerpool/docker/ProcessRunner.scala | 84 ++++++++++++++++++++-- docs/metrics.md | 10 ++- .../docker/test/DockerClientTests.scala | 51 +++++++++---- .../docker/test/DockerContainerTests.scala | 2 +- .../docker/test/ProcessRunnerTests.scala | 50 ++++++++++--- .../kubernetes/test/KubernetesContainerTests.scala | 2 +- 8 files changed, 174 insertions(+), 39 deletions(-) diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala index 19642f9..d0457f8 100644 --- a/common/scala/src/main/scala/whisk/common/Logging.scala +++ b/common/scala/src/main/scala/whisk/common/Logging.scala @@ -243,6 +243,7 @@ object LoggingMarkers { val finish = "finish" val error = "error" val count = "count" + val timeout = "timeout" private val controller = "controller" private val invoker = "invoker" @@ -296,6 +297,8 @@ object LoggingMarkers { // Time in invoker val INVOKER_ACTIVATION = LogMarkerToken(invoker, activation, start) def INVOKER_DOCKER_CMD(cmd: String) = LogMarkerToken(invoker, "docker", start, Some(cmd), Map("cmd" -> cmd)) + def INVOKER_DOCKER_CMD_TIMEOUT(cmd: String) = + LogMarkerToken(invoker, "docker", timeout, Some(cmd), Map("cmd" -> cmd)) def INVOKER_RUNC_CMD(cmd: String) = LogMarkerToken(invoker, "runc", start, Some(cmd), Map("cmd" -> cmd)) def INVOKER_KUBECTL_CMD(cmd: String) = LogMarkerToken(invoker, "kubectl", start, Some(cmd), Map("cmd" -> cmd)) def INVOKER_CONTAINER_START(containerState: String) = diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala index 8a87e37..371b38a 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala @@ -33,9 +33,7 @@ import scala.util.Success import scala.util.Try import akka.event.Logging.{ErrorLevel, InfoLevel} import pureconfig.loadConfigOrThrow -import whisk.common.Logging -import whisk.common.LoggingMarkers -import whisk.common.TransactionId +import whisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId} import whisk.core.ConfigKeys import whisk.core.containerpool.ContainerId import whisk.core.containerpool.ContainerAddress @@ -127,11 +125,11 @@ class DockerClient(dockerHost: Option[String] = None, .map(ContainerId.apply) .recoverWith { // https://docs.docker.com/v1.12/engine/reference/run/#/exit-status - // Exit code 125 means an error reported by the Docker daemon. + // Exit status 125 means an error reported by the Docker daemon. // Examples: // - Unrecognized option specified // - Not enough disk space - case pre: ProcessRunningException if pre.exitCode == 125 => + case pre: ProcessUnsuccessfulException if pre.exitStatus == ExitStatus(125) => Future.failed( DockerContainerId .parse(pre.stdout) @@ -189,6 +187,9 @@ class DockerClient(dockerHost: Option[String] = None, logLevel = InfoLevel) executeProcess(cmd, timeout).andThen { case Success(_) => transid.finished(this, start) + case Failure(pte: ProcessTimeoutException) => + transid.failed(this, start, pte.getMessage, ErrorLevel) + MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_DOCKER_CMD_TIMEOUT(args.head)) case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel) } } diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/ProcessRunner.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/ProcessRunner.scala index b27e62f..f943b80 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/ProcessRunner.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/ProcessRunner.scala @@ -32,7 +32,7 @@ trait ProcessRunner { * Runs the specified command with arguments asynchronously and * capture stdout as well as stderr. * - * If not set to infinite, after timeout is reached the process is killed. + * If not set to infinite, after timeout is reached the process is terminated. * * Be cautious with the execution context you pass because the command * is blocking. @@ -52,17 +52,87 @@ trait ProcessRunner { case _ => None } - (process.exitValue(), out.mkString("\n"), err.mkString("\n"), scheduled) + (ExitStatus(process.exitValue()), out.mkString("\n"), err.mkString("\n"), scheduled) }).flatMap { - case (0, stdout, _, scheduled) => + case (ExitStatus(0), stdout, _, scheduled) => scheduled.foreach(_.cancel()) Future.successful(stdout) - case (code, stdout, stderr, scheduled) => + case (exitStatus, stdout, stderr, scheduled) => scheduled.foreach(_.cancel()) - Future.failed(ProcessRunningException(code, stdout, stderr)) + timeout match { + case _: FiniteDuration if exitStatus.terminatedBySIGTERM => + Future.failed(ProcessTimeoutException(timeout, exitStatus, stdout, stderr)) + case _ => Future.failed(ProcessUnsuccessfulException(exitStatus, stdout, stderr)) + } + } +} + +object ExitStatus { + // Based on The Open Group Base Specifications Issue 7, 2018 edition: + // Shell & Utilities - Shell Command Language - 2.8.2 Exit Status for Commands + // http://pubs.opengroup.org/onlinepubs/9699919799/utilities/V3_chap02.html#tag_18_08_02 + val STATUS_SUCCESSFUL = 0 + val STATUS_NOT_EXECUTABLE = 126 + val STATUS_NOT_FOUND = 127 + // When a command is stopped by a signal, the exit status is 128 + signal numer + val STATUS_SIGNAL = 128 + + // Based on The Open Group Base Specifications Issue 7, 2018 edition: + // Shell & Utilities - Utilities - kill + // http://pubs.opengroup.org/onlinepubs/9699919799/utilities/kill.html + val SIGHUP = 1 + val SIGINT = 2 + val SIGQUIT = 3 + val SIGABRT = 6 + val SIGKILL = 9 + val SIGALRM = 14 + val SIGTERM = 15 +} + +case class ExitStatus(statusValue: Int) { + + import ExitStatus._ + + override def toString(): String = { + def signalAsString(signal: Int): String = { + signal match { + case SIGHUP => "SIGHUP" + case SIGINT => "SIGINT" + case SIGQUIT => "SIGQUIT" + case SIGABRT => "SIGABRT" + case SIGKILL => "SIGKILL" + case SIGALRM => "SIGALRM" + case SIGTERM => "SIGTERM" + case _ => signal.toString + } + } + + val detail = statusValue match { + case STATUS_SUCCESSFUL => "successful" + case STATUS_NOT_EXECUTABLE => "not executable" + case STATUS_NOT_FOUND => "not found" + case _ if statusValue >= ExitStatus.STATUS_SIGNAL => + "terminated by signal " + signalAsString(statusValue - ExitStatus.STATUS_SIGNAL) + case _ => "unsuccessful" } + s"$statusValue ($detail)" + } + + val successful = statusValue == ExitStatus.STATUS_SUCCESSFUL + val terminatedBySIGTERM = (statusValue - ExitStatus.STATUS_SIGNAL) == ExitStatus.SIGTERM } -case class ProcessRunningException(exitCode: Int, stdout: String, stderr: String) - extends Exception(s"code: $exitCode ${if (exitCode == 143) "(killed)" else ""}, stdout: $stdout, stderr: $stderr") +abstract class ProcessRunningException(info: String, val exitStatus: ExitStatus, val stdout: String, val stderr: String) + extends Exception(s"info: $info, code: $exitStatus, stdout: $stdout, stderr: $stderr") + +case class ProcessUnsuccessfulException(override val exitStatus: ExitStatus, + override val stdout: String, + override val stderr: String) + extends ProcessRunningException("command was unsuccessful", exitStatus, stdout, stderr) + +case class ProcessTimeoutException(timeout: Duration, + override val exitStatus: ExitStatus, + override val stdout: String, + override val stderr: String) + extends ProcessRunningException(s"command was terminated, took longer than $timeout", exitStatus, stdout, stderr) diff --git a/docs/metrics.md b/docs/metrics.md index 9cfd887..4bf3520 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -182,34 +182,40 @@ Metrics below are for invoker state as recorded within load balancer monitoring. Following metrics capture stats around various docker command executions. -* Pause +* pause * `openwhisk.counter.invoker_docker.pause_start` * `openwhisk.counter.invoker_docker.pause_error` + * `openwhisk.counter.invoker_docker.pause_timeout` * `openwhisk.histogram.invoker_docker.pause_finish` * `openwhisk.histogram.invoker_docker.pause_error` -* Ps +* ps * `openwhisk.counter.invoker_docker.ps_start` * `openwhisk.counter.invoker_docker.ps_error` + * `openwhisk.counter.invoker_docker.ps_timeout` * `openwhisk.histogram.invoker_docker.ps_finish` * `openwhisk.histogram.invoker_docker.ps_error` * pull * `openwhisk.counter.invoker_docker.pull_start` * `openwhisk.counter.invoker_docker.pull_error` + * `openwhisk.counter.invoker_docker.pull_timeout` * `openwhisk.histogram.invoker_docker.pull_finish` * `openwhisk.histogram.invoker_docker.pull_error` * rm * `openwhisk.counter.invoker_docker.rm_start` * `openwhisk.counter.invoker_docker.rm_error` + * `openwhisk.counter.invoker_docker.rm_timeout` * `openwhisk.histogram.invoker_docker.rm_finish` * `openwhisk.histogram.invoker_docker.rm_error` * run * `openwhisk.counter.invoker_docker.run_start` * `openwhisk.counter.invoker_docker.run_error` + * `openwhisk.counter.invoker_docker.run_timeout` * `openwhisk.histogram.invoker_docker.run_finish` * `openwhisk.histogram.invoker_docker.run_error` * unpause * `openwhisk.counter.invoker_docker.unpause_start` * `openwhisk.counter.invoker_docker.unpause_error` + * `openwhisk.counter.invoker_docker.unpause_timeout` * `openwhisk.histogram.invoker_docker.unpause_finish` * `openwhisk.histogram.invoker_docker.unpause_error` diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala index 200bc99..66637f8 100644 --- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala @@ -18,7 +18,6 @@ package whisk.core.containerpool.docker.test import akka.actor.ActorSystem - import java.util.concurrent.Semaphore import scala.concurrent.Await @@ -41,10 +40,7 @@ import whisk.common.LoggingMarkers.INVOKER_DOCKER_CMD import whisk.common.TransactionId import whisk.core.containerpool.ContainerAddress import whisk.core.containerpool.ContainerId -import whisk.core.containerpool.docker.BrokenDockerContainer -import whisk.core.containerpool.docker.DockerClient -import whisk.core.containerpool.docker.DockerContainerId -import whisk.core.containerpool.docker.ProcessRunningException +import whisk.core.containerpool.docker._ import whisk.utils.retry @RunWith(classOf[JUnitRunner]) @@ -246,7 +242,7 @@ class DockerClientTests runCmdCount += 1 println(s"runCmdCount=${runCmdCount}, args.last=${args.last}") runCmdCount match { - case 1 => Future.failed(ProcessRunningException(1, "", "")) + case 1 => Future.failed(ProcessUnsuccessfulException(ExitStatus(1), "", "")) case 2 => Future.successful(secondContainerId.asString) case _ => Future.failed(new Throwable()) } @@ -337,11 +333,11 @@ class DockerClientTests runAndVerify(dc.pull("image"), "pull") } - it should "fail with BrokenDockerContainer when run returns with exit code 125 and a container ID" in { + it should "fail with BrokenDockerContainer when run returns with exit status 125 and a container ID" in { val dc = dockerClient { Future.failed( - ProcessRunningException( - exitCode = 125, + ProcessUnsuccessfulException( + exitStatus = ExitStatus(125), stdout = id.asString, stderr = """/usr/bin/docker: Error response from daemon: mkdir /var/run/docker.1.1/libcontainerd.1.1/55db56ee082239428b27d3728b4dd324c09068458aad9825727d5bfc1bba6d52: no space left on device.""")) @@ -350,19 +346,44 @@ class DockerClientTests bdc.id shouldBe id } - it should "fail with ProcessRunningException when run returns with exit code !=125 or no container ID" in { + it should "fail with ProcessRunningException when run returns with exit code !=125, no container ID or timeout" in { def runAndVerify(pre: ProcessRunningException, clue: String) = { val dc = dockerClient { Future.failed(pre) } - withClue(s"${clue} - exitCode = ${pre.exitCode}, stdout = '${pre.stdout}', stderr = '${pre.stderr}': ") { + withClue(s"${clue} - exitStatus = ${pre.exitStatus}, stdout = '${pre.stdout}', stderr = '${pre.stderr}': ") { the[ProcessRunningException] thrownBy await(dc.run("image", Seq.empty)) shouldBe pre } } Seq[(ProcessRunningException, String)]( - (ProcessRunningException(126, id.asString, "Unknown command"), "Exit code not 125"), - (ProcessRunningException(125, "", "Unknown flag: --foo"), "No container ID"), - (ProcessRunningException(1, "", ""), "Exit code not 125 and no container ID")).foreach { - case (pre, clue) => runAndVerify(pre, clue) + (ProcessUnsuccessfulException(ExitStatus(127), id.asString, "Unknown command"), "Exit code not 125"), + (ProcessUnsuccessfulException(ExitStatus(125), "", "Unknown flag: --foo"), "No container ID"), + (ProcessUnsuccessfulException(ExitStatus(1), "", ""), "Exit code not 125 and no container ID"), + (ProcessTimeoutException(1.second, ExitStatus(125), id.asString, ""), "Timeout instead of unsuccessful command")) + .foreach { + case (pre, clue) => runAndVerify(pre, clue) + } + } + + it should "fail with ProcessTimeoutException when command times out" in { + val expectedPTE = + ProcessTimeoutException(timeout = 10.seconds, exitStatus = ExitStatus(143), stdout = "stdout", stderr = "stderr") + val dc = dockerClient { + Future.failed(expectedPTE) } + Seq[(Future[_], String)]( + (dc.run("image", Seq.empty), "run"), + (dc.inspectIPAddress(id, "network"), "inspectIPAddress"), + (dc.pause(id), "pause"), + (dc.unpause(id), "unpause"), + (dc.rm(id), "rm"), + (dc.ps(), "ps"), + (dc.pull("image"), "pull"), + (dc.isOomKilled(id), "isOomKilled")) + .foreach { + case (cmd, clue) => + withClue(s"command '$clue' - ") { + the[ProcessTimeoutException] thrownBy await(cmd) shouldBe expectedPTE + } + } } } diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala index 6730e8c..c0e5e37 100644 --- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala @@ -214,7 +214,7 @@ class DockerContainerTests override def run(image: String, args: Seq[String] = Seq.empty[String])(implicit transid: TransactionId): Future[ContainerId] = { runs += ((image, args)) - Future.failed(ProcessRunningException(1, "", "")) + Future.failed(ProcessUnsuccessfulException(ExitStatus(1), "", "")) } } implicit val runc = stub[RuncApi] diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/ProcessRunnerTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/ProcessRunnerTests.scala index 4abe61f..4527373 100644 --- a/tests/src/test/scala/whisk/core/containerpool/docker/test/ProcessRunnerTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/ProcessRunnerTests.scala @@ -26,13 +26,12 @@ import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner import scala.concurrent.ExecutionContext.Implicits.global -import whisk.core.containerpool.docker.ProcessRunner +import whisk.core.containerpool.docker._ import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import scala.concurrent.Await import org.scalatest.Matchers -import whisk.core.containerpool.docker.ProcessRunningException import scala.language.reflectiveCalls // Needed to invoke run() method of structural ProcessRunner extension @@ -55,18 +54,53 @@ class ProcessRunnerTests extends FlatSpec with Matchers with WskActorSystem { } it should "run an external command unsuccessfully and capture its output" in { - val exitCode = 1 + val exitStatus = ExitStatus(1) val stdout = "Output" val stderr = "Error" - val future = processRunner.run(Seq("/bin/sh", "-c", s"echo ${stdout}; echo ${stderr} 1>&2; exit ${exitCode}")) + val future = + processRunner.run(Seq("/bin/sh", "-c", s"echo ${stdout}; echo ${stderr} 1>&2; exit ${exitStatus.statusValue}")) - the[ProcessRunningException] thrownBy await(future) shouldBe ProcessRunningException(exitCode, stdout, stderr) + val exception = the[ProcessRunningException] thrownBy await(future) + exception shouldBe ProcessUnsuccessfulException(exitStatus, stdout, stderr) + exception.getMessage should startWith("info: command was unsuccessful") } it should "terminate an external command after the specified timeout is reached" in { - val future = processRunner.run(Seq("sleep", "1"), 100.milliseconds) - val exception = the[ProcessRunningException] thrownBy await(future) - exception.exitCode shouldBe 143 + val timeout = 100.milliseconds + // Run "sleep" command for 1 second and make sure that stdout and stderr are dropped + val future = processRunner.run(Seq("/bin/sh", "-c", "sleep 1 1>/dev/null 2>/dev/null"), timeout) + val exception = the[ProcessTimeoutException] thrownBy await(future) + exception shouldBe ProcessTimeoutException(timeout, ExitStatus(143), "", "") + exception.getMessage should startWith(s"info: command was terminated, took longer than $timeout") + } + + behavior of "ExitStatus" + + it should "provide a proper textual representation" in { + Seq[(Int, String)]( + (0, "successful"), + (1, "unsuccessful"), + (125, "unsuccessful"), + (126, "not executable"), + (127, "not found"), + (128, "terminated by signal 0"), + (129, "terminated by signal SIGHUP"), + (130, "terminated by signal SIGINT"), + (131, "terminated by signal SIGQUIT"), + (134, "terminated by signal SIGABRT"), + (137, "terminated by signal SIGKILL"), + (142, "terminated by signal SIGALRM"), + (143, "terminated by signal SIGTERM"), + (144, "terminated by signal 16")).foreach { + case (statusValue, detailText) => + ExitStatus(statusValue).toString shouldBe s"$statusValue ($detailText)" + } + } + + it should "properly classify exit status" in { + withClue("Exit status 0 is successful - ") { ExitStatus(0).successful shouldBe true } + withClue("Exit status 1 is not successful - ") { ExitStatus(1).successful shouldBe false } + withClue("Exit status 143 means terminated by SIGTERM - ") { ExitStatus(143).terminatedBySIGTERM shouldBe true } } } diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala index 227d3c9..573b581 100644 --- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala @@ -173,7 +173,7 @@ class KubernetesContainerTests env: Map[String, String] = Map.empty, labels: Map[String, String] = Map.empty)(implicit transid: TransactionId): Future[KubernetesContainer] = { runs += ((name, image, env, labels)) - Future.failed(ProcessRunningException(1, "", "")) + Future.failed(ProcessUnsuccessfulException(ExitStatus(1), "", "")) } }