This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new f95f219 At most 10 `docker run` commands are allowed in parallel. (#2995) f95f219 is described below commit f95f219f371002507ddd57291d56dbc8bf8f22ab Author: Sven Lange-Last <sven.lange-l...@de.ibm.com> AuthorDate: Fri Nov 24 11:30:41 2017 +0100 At most 10 `docker run` commands are allowed in parallel. (#2995) Docker < 1.13.1 has a known problem: if more than 10 containers are created (`docker run`) concurrently, there is a good chance that some of them will fail. See https://github.com/moby/moby/issues/29369 Use a semaphore to make sure that at most 10 `docker run` commands are active the same time. --- .../core/containerpool/docker/DockerClient.scala | 59 +++++++---- .../docker/test/DockerClientTests.scala | 109 +++++++++++++++++++-- 2 files changed, 144 insertions(+), 24 deletions(-) diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala index bf94325..e6b3dab 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala @@ -20,18 +20,20 @@ package whisk.core.containerpool.docker import java.io.FileNotFoundException import java.nio.file.Files import java.nio.file.Paths +import java.util.concurrent.Semaphore +import scala.collection.concurrent.TrieMap +import scala.concurrent.blocking import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.util.Failure import scala.util.Success import scala.util.Try import akka.event.Logging.ErrorLevel + import whisk.common.Logging import whisk.common.LoggingMarkers import whisk.common.TransactionId - -import scala.collection.concurrent.TrieMap import whisk.core.containerpool.ContainerId import whisk.core.containerpool.ContainerAddress @@ -75,25 +77,46 @@ class DockerClient(dockerHost: Option[String] = None)(executionContext: Executio Seq(dockerBin) ++ host } + protected val maxParallelRuns = 10 + protected val runSemaphore = new Semaphore( /* permits= */ maxParallelRuns, /* fair= */ true) + + // Docker < 1.13.1 has a known problem: if more than 10 containers are created (docker run) + // concurrently, there is a good chance that some of them will fail. + // See https://github.com/moby/moby/issues/29369 + // Use a semaphore to make sure that at most 10 `docker run` commands are active + // the same time. def run(image: String, args: Seq[String] = Seq.empty[String])( implicit transid: TransactionId): Future[ContainerId] = { - runCmd((Seq("run", "-d") ++ args ++ Seq(image)): _*) - .map { - ContainerId(_) - } - .recoverWith { - // https://docs.docker.com/v1.12/engine/reference/run/#/exit-status - // Exit code 125 means an error reported by the Docker daemon. - // Examples: - // - Unrecognized option specified - // - Not enough disk space - case pre: ProcessRunningException if pre.exitCode == 125 => - Future.failed( - DockerContainerId - .parse(pre.stdout) - .map(BrokenDockerContainer(_, s"Broken container: ${pre.getMessage}")) - .getOrElse(pre)) + Future { + blocking { + // Acquires a permit from this semaphore, blocking until one is available, or the thread is interrupted. + // Throws InterruptedException if the current thread is interrupted + runSemaphore.acquire() } + }.flatMap { _ => + // Iff the semaphore was acquired successfully + runCmd((Seq("run", "-d") ++ args ++ Seq(image)): _*) + .andThen { + // Release the semaphore as quick as possible regardless of the runCmd() result + case _ => runSemaphore.release() + } + .map { + ContainerId(_) + } + .recoverWith { + // https://docs.docker.com/v1.12/engine/reference/run/#/exit-status + // Exit code 125 means an error reported by the Docker daemon. + // Examples: + // - Unrecognized option specified + // - Not enough disk space + case pre: ProcessRunningException if pre.exitCode == 125 => + Future.failed( + DockerContainerId + .parse(pre.stdout) + .map(BrokenDockerContainer(_, s"Broken container: ${pre.getMessage}")) + .getOrElse(pre)) + } + } } def inspectIPAddress(id: ContainerId, network: String)(implicit transid: TransactionId): Future[ContainerAddress] = diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala index 8f447cf..9c2b5b3 100644 --- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala @@ -17,6 +17,8 @@ package whisk.core.containerpool.docker.test +import java.util.concurrent.Semaphore + import scala.concurrent.Await import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContext.Implicits.global @@ -27,26 +29,31 @@ import scala.concurrent.Promise import scala.util.Success import org.junit.runner.RunWith import org.scalatest.BeforeAndAfterEach +import org.scalatest.concurrent.Eventually import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner import org.scalatest.Matchers +import org.scalatest.time.{Seconds, Span} import common.StreamLogging + import whisk.common.LogMarker import whisk.common.LoggingMarkers.INVOKER_DOCKER_CMD import whisk.common.TransactionId -import whisk.core.containerpool.docker.DockerClient -import whisk.core.containerpool.ContainerId import whisk.core.containerpool.ContainerAddress -import whisk.utils.retry -import whisk.core.containerpool.docker.ProcessRunningException -import whisk.core.containerpool.docker.DockerContainerId +import whisk.core.containerpool.ContainerId import whisk.core.containerpool.docker.BrokenDockerContainer +import whisk.core.containerpool.docker.DockerClient +import whisk.core.containerpool.docker.DockerContainerId +import whisk.core.containerpool.docker.ProcessRunningException +import whisk.utils.retry @RunWith(classOf[JUnitRunner]) -class DockerClientTests extends FlatSpec with Matchers with StreamLogging with BeforeAndAfterEach { +class DockerClientTests extends FlatSpec with Matchers with StreamLogging with BeforeAndAfterEach with Eventually { override def beforeEach = stream.reset() + implicit override val patienceConfig = PatienceConfig(timeout = scaled(Span(5, Seconds))) + implicit val transid = TransactionId.testing val id = ContainerId("55db56ee082239428b27d3728b4dd324c09068458aad9825727d5bfc1bba6d52") @@ -169,6 +176,96 @@ class DockerClientTests extends FlatSpec with Matchers with StreamLogging with B } } + it should "limit the number of concurrent docker run invocations" in { + // Delay execution of Docker run command + val firstRunPromise = Promise[String]() + + val firstContainerId = ContainerId("1" * 64) + val secondContainerId = ContainerId("2" * 64) + + var runCmdCount = 0 + val dc = new DockerClient()(global) { + override val dockerCmd = Seq(dockerCommand) + override def executeProcess(args: String*)(implicit ec: ExecutionContext) = { + runCmdCount += 1 + runCmdCount match { + case 1 => firstRunPromise.future + case 2 => Future.successful(secondContainerId.asString) + case _ => Future.failed(new Throwable()) + } + } + // Need to override the semaphore, otherwise the tested code will still + // create the semaphore with the original value of maxParallelRuns. + override val maxParallelRuns = 1 + override val runSemaphore = new Semaphore( /* permits= */ maxParallelRuns, /* fair= */ true) + } + + val image = "image" + val args = Seq("args") + + val firstRunResult = dc.run(image, args) + val secondRunResult = dc.run(image, args) + + // The tested code won't reach the mocked executeProcess() and thus, increase runCmdCount, + // until at least one Future is successfully completed. For this reason, it takes + // some time until the following matcher is successful. + eventually { runCmdCount shouldBe 1 } + + // Complete the first Docker run command so that the second is eligible to run + firstRunPromise.success(firstContainerId.asString) + + // Cannot assert that the first Docker run always obtains the first container because + // the tested code uses Futures so that sequence may differ from test run to test run. + val firstResultContainerId = await(firstRunResult) + + // Now, second command should be complete + eventually { runCmdCount shouldBe 2 } + + val secondResultContainerId = await(secondRunResult) + Set(firstResultContainerId, secondResultContainerId) should contain theSameElementsAs Set( + firstContainerId, + secondContainerId) + } + + it should "tolerate docker run errors when limiting the number of concurrent docker run invocations" in { + val secondContainerId = ContainerId("2" * 64) + + var runCmdCount = 0 + val dc = new DockerClient()(global) { + override val dockerCmd = Seq(dockerCommand) + override def executeProcess(args: String*)(implicit ec: ExecutionContext) = { + runCmdCount += 1 + println(s"runCmdCount=${runCmdCount}, args.last=${args.last}") + runCmdCount match { + case 1 => Future.failed(ProcessRunningException(1, "", "")) + case 2 => Future.successful(secondContainerId.asString) + case _ => Future.failed(new Throwable()) + } + } + // Need to override the semaphore, otherwise the tested code will still + // create the semaphore with the original value of maxParallelRuns. + override val maxParallelRuns = 1 + override val runSemaphore = new Semaphore( /* permits= */ maxParallelRuns, /* fair= */ true) + } + + val image = "image" + val args = Seq("args") + + // Kick off the first Docker run command - it will fail. + val firstRunResult = dc.run(image, args) + + an[Exception] should be thrownBy await(firstRunResult) + runCmdCount shouldBe 1 + + // Now kick off the second Docker run command - it is expected to succeed. + // If this command completes without timeout, the concurrency limit properly + // deals with errors. + val secondRunResult = dc.run(image, args) + + await(secondRunResult) shouldBe secondContainerId + runCmdCount shouldBe 2 + } + it should "write proper log markers on a successful command" in { // a dummy string works here as we do not assert any output // from the methods below -- To stop receiving notification emails like this one, please contact ['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].