markusthoemmes closed pull request #3698: Make amount of parallel docker runs configurable. URL: https://github.com/apache/incubator-openwhisk/pull/3698
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml index 98b5c146fc..8fd93bf704 100644 --- a/ansible/roles/invoker/tasks/deploy.yml +++ b/ansible/roles/invoker/tasks/deploy.yml @@ -206,6 +206,7 @@ "INVOKER_CONTAINER_POLICY": "{{ invoker_container_policy_name | default()}}" "CONFIG_whisk_containerPool_numCore": "{{ invoker.numcore }}" "CONFIG_whisk_containerPool_coreShare": "{{ invoker.coreshare }}" + "CONFIG_whisk_docker_client_parallelRuns": "{{ invoker_parallel_runs | default() }}" "CONFIG_whisk_docker_containerFactory_useRunc": "{{ invoker.useRunc }}" "WHISK_LOGS_DIR": "{{ whisk_logs_dir }}" "METRICS_KAMON": "{{ metrics.kamon.enabled }}" @@ -271,7 +272,7 @@ volumes: "{{ volumes|default('') }},{{ coverage_logs_dir }}/invoker:/coverage" when: coverage_enabled -- name: start invoker using docker cli +- name: start invoker docker_container: userns_mode: "host" pid_mode: "host" diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala index 7fdb10b621..5f7a8db5a0 100644 --- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala +++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala @@ -213,7 +213,7 @@ object ConfigKeys { val db = "whisk.db" val docker = "whisk.docker" - val dockerTimeouts = s"$docker.timeouts" + val dockerClient = s"$docker.client" val dockerContainerFactory = s"${docker}.container-factory" val runc = "whisk.runc" val runcTimeouts = s"$runc.timeouts" diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf index 8edb3868a6..c471d1b726 100644 --- a/core/invoker/src/main/resources/application.conf +++ b/core/invoker/src/main/resources/application.conf @@ -10,15 +10,25 @@ whisk { poll-interval: 5 minutes } - # Timeouts for docker commands. Set to "Inf" to disable timeout. - docker.timeouts { - run: 1 minute - rm: 1 minute - pull: 10 minutes - ps: 1 minute - inspect: 1 minute - pause: 10 seconds - unpause: 10 seconds + docker.client { + # Docker < 1.13.1 has a known problem: if more than 10 containers are created (docker run) + # concurrently, there is a good chance that some of them will fail. + # See https://github.com/moby/moby/issues/29369 + # Use a semaphore to make sure that at most 10 `docker run` commands are active + # the same time. + # 0 means that there are infinite parallel runs. + parallel-runs: 10 + + # Timeouts for docker commands. Set to "Inf" to disable timeout. + timeouts { + run: 1 minute + rm: 1 minute + pull: 10 minutes + ps: 1 minute + inspect: 1 minute + pause: 10 seconds + unpause: 10 seconds + } } docker.container-factory { diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala index 44102b8dcb..8a87e379e8 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala @@ -65,6 +65,11 @@ case class DockerClientTimeoutConfig(run: Duration, unpause: Duration, inspect: Duration) +/** + * Configuration for docker client + */ +case class DockerClientConfig(parallelRuns: Int, timeouts: DockerClientTimeoutConfig) + /** * Serves as interface to the docker CLI tool. * @@ -74,8 +79,7 @@ case class DockerClientTimeoutConfig(run: Duration, * You only need one instance (and you shouldn't get more). */ class DockerClient(dockerHost: Option[String] = None, - timeouts: DockerClientTimeoutConfig = - loadConfigOrThrow[DockerClientTimeoutConfig](ConfigKeys.dockerTimeouts))( + config: DockerClientConfig = loadConfigOrThrow[DockerClientConfig](ConfigKeys.dockerClient))( executionContext: ExecutionContext)(implicit log: Logging, as: ActorSystem) extends DockerApi with ProcessRunner { @@ -96,8 +100,9 @@ class DockerClient(dockerHost: Option[String] = None, Seq(dockerBin) ++ host } - protected val maxParallelRuns = 10 - protected val runSemaphore = new Semaphore( /* permits= */ maxParallelRuns, /* fair= */ true) + protected val maxParallelRuns = config.parallelRuns + protected val runSemaphore = + new Semaphore( /* permits= */ if (maxParallelRuns > 0) maxParallelRuns else Int.MaxValue, /* fair= */ true) // Docker < 1.13.1 has a known problem: if more than 10 containers are created (docker run) // concurrently, there is a good chance that some of them will fail. @@ -114,7 +119,7 @@ class DockerClient(dockerHost: Option[String] = None, } }.flatMap { _ => // Iff the semaphore was acquired successfully - runCmd(Seq("run", "-d") ++ args ++ Seq(image), timeouts.run) + runCmd(Seq("run", "-d") ++ args ++ Seq(image), config.timeouts.run) .andThen { // Release the semaphore as quick as possible regardless of the runCmd() result case _ => runSemaphore.release() @@ -139,26 +144,26 @@ class DockerClient(dockerHost: Option[String] = None, def inspectIPAddress(id: ContainerId, network: String)(implicit transid: TransactionId): Future[ContainerAddress] = runCmd( Seq("inspect", "--format", s"{{.NetworkSettings.Networks.${network}.IPAddress}}", id.asString), - timeouts.inspect).flatMap { + config.timeouts.inspect).flatMap { case "<no value>" => Future.failed(new NoSuchElementException) case stdout => Future.successful(ContainerAddress(stdout)) } def pause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] = - runCmd(Seq("pause", id.asString), timeouts.pause).map(_ => ()) + runCmd(Seq("pause", id.asString), config.timeouts.pause).map(_ => ()) def unpause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] = - runCmd(Seq("unpause", id.asString), timeouts.unpause).map(_ => ()) + runCmd(Seq("unpause", id.asString), config.timeouts.unpause).map(_ => ()) def rm(id: ContainerId)(implicit transid: TransactionId): Future[Unit] = - runCmd(Seq("rm", "-f", id.asString), timeouts.rm).map(_ => ()) + runCmd(Seq("rm", "-f", id.asString), config.timeouts.rm).map(_ => ()) def ps(filters: Seq[(String, String)] = Seq.empty, all: Boolean = false)( implicit transid: TransactionId): Future[Seq[ContainerId]] = { val filterArgs = filters.flatMap { case (attr, value) => Seq("--filter", s"$attr=$value") } val allArg = if (all) Seq("--all") else Seq.empty[String] val cmd = Seq("ps", "--quiet", "--no-trunc") ++ allArg ++ filterArgs - runCmd(cmd, timeouts.ps).map(_.lines.toSeq.map(ContainerId.apply)) + runCmd(cmd, config.timeouts.ps).map(_.lines.toSeq.map(ContainerId.apply)) } /** @@ -169,11 +174,11 @@ class DockerClient(dockerHost: Option[String] = None, private val pullsInFlight = TrieMap[String, Future[Unit]]() def pull(image: String)(implicit transid: TransactionId): Future[Unit] = pullsInFlight.getOrElseUpdate(image, { - runCmd(Seq("pull", image), timeouts.pull).map(_ => ()).andThen { case _ => pullsInFlight.remove(image) } + runCmd(Seq("pull", image), config.timeouts.pull).map(_ => ()).andThen { case _ => pullsInFlight.remove(image) } }) def isOomKilled(id: ContainerId)(implicit transid: TransactionId): Future[Boolean] = - runCmd(Seq("inspect", id.asString, "--format", "{{.State.OOMKilled}}"), timeouts.inspect).map(_.toBoolean) + runCmd(Seq("inspect", id.asString, "--format", "{{.State.OOMKilled}}"), config.timeouts.inspect).map(_.toBoolean) private def runCmd(args: Seq[String], timeout: Duration)(implicit transid: TransactionId): Future[String] = { val cmd = dockerCmd ++ args ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services