This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new f95f219  At most 10 `docker run` commands are allowed in parallel. 
(#2995)
f95f219 is described below

commit f95f219f371002507ddd57291d56dbc8bf8f22ab
Author: Sven Lange-Last <sven.lange-l...@de.ibm.com>
AuthorDate: Fri Nov 24 11:30:41 2017 +0100

    At most 10 `docker run` commands are allowed in parallel. (#2995)
    
    Docker < 1.13.1 has a known problem: if more than 10 containers are created 
(`docker run`) concurrently, there is a good chance that some of them will 
fail. See https://github.com/moby/moby/issues/29369
    
    Use a semaphore to make sure that at most 10 `docker run` commands are 
active the same time.
---
 .../core/containerpool/docker/DockerClient.scala   |  59 +++++++----
 .../docker/test/DockerClientTests.scala            | 109 +++++++++++++++++++--
 2 files changed, 144 insertions(+), 24 deletions(-)

diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
 
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
index bf94325..e6b3dab 100644
--- 
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
+++ 
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
@@ -20,18 +20,20 @@ package whisk.core.containerpool.docker
 import java.io.FileNotFoundException
 import java.nio.file.Files
 import java.nio.file.Paths
+import java.util.concurrent.Semaphore
 
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.blocking
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.util.Failure
 import scala.util.Success
 import scala.util.Try
 import akka.event.Logging.ErrorLevel
+
 import whisk.common.Logging
 import whisk.common.LoggingMarkers
 import whisk.common.TransactionId
-
-import scala.collection.concurrent.TrieMap
 import whisk.core.containerpool.ContainerId
 import whisk.core.containerpool.ContainerAddress
 
@@ -75,25 +77,46 @@ class DockerClient(dockerHost: Option[String] = 
None)(executionContext: Executio
     Seq(dockerBin) ++ host
   }
 
+  protected val maxParallelRuns = 10
+  protected val runSemaphore = new Semaphore( /* permits= */ maxParallelRuns, 
/* fair= */ true)
+
+  // Docker < 1.13.1 has a known problem: if more than 10 containers are 
created (docker run)
+  // concurrently, there is a good chance that some of them will fail.
+  // See https://github.com/moby/moby/issues/29369
+  // Use a semaphore to make sure that at most 10 `docker run` commands are 
active
+  // the same time.
   def run(image: String, args: Seq[String] = Seq.empty[String])(
     implicit transid: TransactionId): Future[ContainerId] = {
-    runCmd((Seq("run", "-d") ++ args ++ Seq(image)): _*)
-      .map {
-        ContainerId(_)
-      }
-      .recoverWith {
-        // https://docs.docker.com/v1.12/engine/reference/run/#/exit-status
-        // Exit code 125 means an error reported by the Docker daemon.
-        // Examples:
-        // - Unrecognized option specified
-        // - Not enough disk space
-        case pre: ProcessRunningException if pre.exitCode == 125 =>
-          Future.failed(
-            DockerContainerId
-              .parse(pre.stdout)
-              .map(BrokenDockerContainer(_, s"Broken container: 
${pre.getMessage}"))
-              .getOrElse(pre))
+    Future {
+      blocking {
+        // Acquires a permit from this semaphore, blocking until one is 
available, or the thread is interrupted.
+        // Throws InterruptedException if the current thread is interrupted
+        runSemaphore.acquire()
       }
+    }.flatMap { _ =>
+      // Iff the semaphore was acquired successfully
+      runCmd((Seq("run", "-d") ++ args ++ Seq(image)): _*)
+        .andThen {
+          // Release the semaphore as quick as possible regardless of the 
runCmd() result
+          case _ => runSemaphore.release()
+        }
+        .map {
+          ContainerId(_)
+        }
+        .recoverWith {
+          // https://docs.docker.com/v1.12/engine/reference/run/#/exit-status
+          // Exit code 125 means an error reported by the Docker daemon.
+          // Examples:
+          // - Unrecognized option specified
+          // - Not enough disk space
+          case pre: ProcessRunningException if pre.exitCode == 125 =>
+            Future.failed(
+              DockerContainerId
+                .parse(pre.stdout)
+                .map(BrokenDockerContainer(_, s"Broken container: 
${pre.getMessage}"))
+                .getOrElse(pre))
+        }
+    }
   }
 
   def inspectIPAddress(id: ContainerId, network: String)(implicit transid: 
TransactionId): Future[ContainerAddress] =
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
 
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
index 8f447cf..9c2b5b3 100644
--- 
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
+++ 
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
@@ -17,6 +17,8 @@
 
 package whisk.core.containerpool.docker.test
 
+import java.util.concurrent.Semaphore
+
 import scala.concurrent.Await
 import scala.concurrent.ExecutionContext
 import scala.concurrent.ExecutionContext.Implicits.global
@@ -27,26 +29,31 @@ import scala.concurrent.Promise
 import scala.util.Success
 import org.junit.runner.RunWith
 import org.scalatest.BeforeAndAfterEach
+import org.scalatest.concurrent.Eventually
 import org.scalatest.FlatSpec
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.Matchers
+import org.scalatest.time.{Seconds, Span}
 import common.StreamLogging
+
 import whisk.common.LogMarker
 import whisk.common.LoggingMarkers.INVOKER_DOCKER_CMD
 import whisk.common.TransactionId
-import whisk.core.containerpool.docker.DockerClient
-import whisk.core.containerpool.ContainerId
 import whisk.core.containerpool.ContainerAddress
-import whisk.utils.retry
-import whisk.core.containerpool.docker.ProcessRunningException
-import whisk.core.containerpool.docker.DockerContainerId
+import whisk.core.containerpool.ContainerId
 import whisk.core.containerpool.docker.BrokenDockerContainer
+import whisk.core.containerpool.docker.DockerClient
+import whisk.core.containerpool.docker.DockerContainerId
+import whisk.core.containerpool.docker.ProcessRunningException
+import whisk.utils.retry
 
 @RunWith(classOf[JUnitRunner])
-class DockerClientTests extends FlatSpec with Matchers with StreamLogging with 
BeforeAndAfterEach {
+class DockerClientTests extends FlatSpec with Matchers with StreamLogging with 
BeforeAndAfterEach with Eventually {
 
   override def beforeEach = stream.reset()
 
+  implicit override val patienceConfig = PatienceConfig(timeout = 
scaled(Span(5, Seconds)))
+
   implicit val transid = TransactionId.testing
   val id = 
ContainerId("55db56ee082239428b27d3728b4dd324c09068458aad9825727d5bfc1bba6d52")
 
@@ -169,6 +176,96 @@ class DockerClientTests extends FlatSpec with Matchers 
with StreamLogging with B
     }
   }
 
+  it should "limit the number of concurrent docker run invocations" in {
+    // Delay execution of Docker run command
+    val firstRunPromise = Promise[String]()
+
+    val firstContainerId = ContainerId("1" * 64)
+    val secondContainerId = ContainerId("2" * 64)
+
+    var runCmdCount = 0
+    val dc = new DockerClient()(global) {
+      override val dockerCmd = Seq(dockerCommand)
+      override def executeProcess(args: String*)(implicit ec: 
ExecutionContext) = {
+        runCmdCount += 1
+        runCmdCount match {
+          case 1 => firstRunPromise.future
+          case 2 => Future.successful(secondContainerId.asString)
+          case _ => Future.failed(new Throwable())
+        }
+      }
+      // Need to override the semaphore, otherwise the tested code will still
+      // create the semaphore with the original value of maxParallelRuns.
+      override val maxParallelRuns = 1
+      override val runSemaphore = new Semaphore( /* permits= */ 
maxParallelRuns, /* fair= */ true)
+    }
+
+    val image = "image"
+    val args = Seq("args")
+
+    val firstRunResult = dc.run(image, args)
+    val secondRunResult = dc.run(image, args)
+
+    // The tested code won't reach the mocked executeProcess() and thus, 
increase runCmdCount,
+    // until at least one Future is successfully completed. For this reason, 
it takes
+    // some time until the following matcher is successful.
+    eventually { runCmdCount shouldBe 1 }
+
+    // Complete the first Docker run command so that the second is eligible to 
run
+    firstRunPromise.success(firstContainerId.asString)
+
+    // Cannot assert that the first Docker run always obtains the first 
container because
+    // the tested code uses Futures so that sequence may differ from test run 
to test run.
+    val firstResultContainerId = await(firstRunResult)
+
+    // Now, second command should be complete
+    eventually { runCmdCount shouldBe 2 }
+
+    val secondResultContainerId = await(secondRunResult)
+    Set(firstResultContainerId, secondResultContainerId) should contain 
theSameElementsAs Set(
+      firstContainerId,
+      secondContainerId)
+  }
+
+  it should "tolerate docker run errors when limiting the number of concurrent 
docker run invocations" in {
+    val secondContainerId = ContainerId("2" * 64)
+
+    var runCmdCount = 0
+    val dc = new DockerClient()(global) {
+      override val dockerCmd = Seq(dockerCommand)
+      override def executeProcess(args: String*)(implicit ec: 
ExecutionContext) = {
+        runCmdCount += 1
+        println(s"runCmdCount=${runCmdCount}, args.last=${args.last}")
+        runCmdCount match {
+          case 1 => Future.failed(ProcessRunningException(1, "", ""))
+          case 2 => Future.successful(secondContainerId.asString)
+          case _ => Future.failed(new Throwable())
+        }
+      }
+      // Need to override the semaphore, otherwise the tested code will still
+      // create the semaphore with the original value of maxParallelRuns.
+      override val maxParallelRuns = 1
+      override val runSemaphore = new Semaphore( /* permits= */ 
maxParallelRuns, /* fair= */ true)
+    }
+
+    val image = "image"
+    val args = Seq("args")
+
+    // Kick off the first Docker run command - it will fail.
+    val firstRunResult = dc.run(image, args)
+
+    an[Exception] should be thrownBy await(firstRunResult)
+    runCmdCount shouldBe 1
+
+    // Now kick off the second Docker run command - it is expected to succeed.
+    // If this command completes without timeout, the concurrency limit 
properly
+    // deals with errors.
+    val secondRunResult = dc.run(image, args)
+
+    await(secondRunResult) shouldBe secondContainerId
+    runCmdCount shouldBe 2
+  }
+
   it should "write proper log markers on a successful command" in {
     // a dummy string works here as we do not assert any output
     // from the methods below

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].

Reply via email to