This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 0d22b99 Make REST communication with action containers more robust. (#3710) 0d22b99 is described below commit 0d22b9976e22bbe37bd77ba5afd3836870870d52 Author: Sven Lange-Last <sven.lange-l...@de.ibm.com> AuthorDate: Tue May 29 15:49:52 2018 +0200 Make REST communication with action containers more robust. (#3710) On systems with high load, POSTing the action container `/init` endpoint occasionally fails with `NoRouteToHostException`. Retry if this exception occurs. --- .../scala/whisk/core/containerpool/HttpUtils.scala | 60 +++++++++++++++------- .../scala/whisk/core/entity/ActivationResult.scala | 2 +- .../scala/actionContainers/ActionContainer.scala | 20 +++++--- .../docker/test/ContainerConnectionTests.scala | 14 +++-- .../docker/test/DockerContainerTests.scala | 4 +- .../kubernetes/test/KubernetesContainerTests.scala | 4 +- .../core/entity/test/ActivationResponseTests.scala | 4 +- 7 files changed, 71 insertions(+), 37 deletions(-) diff --git a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala index e0fd37f..07c6fc7 100644 --- a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala +++ b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala @@ -17,14 +17,15 @@ package whisk.core.containerpool +import java.net.NoRouteToHostException import java.nio.charset.StandardCharsets -import scala.concurrent.duration.DurationInt -import scala.concurrent.duration.FiniteDuration +import scala.annotation.tailrec +import scala.concurrent.duration._ +import scala.util.control.NoStackTrace import scala.util.Failure import scala.util.Success import scala.util.Try - import org.apache.commons.io.IOUtils import org.apache.http.HttpHeaders import org.apache.http.client.config.RequestConfig @@ -34,8 +35,9 @@ import org.apache.http.client.utils.URIBuilder import org.apache.http.conn.HttpHostConnectException import org.apache.http.entity.StringEntity import org.apache.http.impl.client.HttpClientBuilder - import spray.json._ +import whisk.common.Logging +import whisk.common.TransactionId import whisk.core.entity.ActivationResponse._ import whisk.core.entity.ByteSize import whisk.core.entity.size.SizeLong @@ -52,7 +54,8 @@ import whisk.core.entity.size.SizeLong * @param timeout the timeout in msecs to wait for a response * @param maxResponse the maximum size in bytes the connection will accept */ -protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxResponse: ByteSize) { +protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxResponse: ByteSize)( + implicit logging: Logging) { def close() = Try(connection.close()) @@ -68,7 +71,8 @@ protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxRe * @param retry whether or not to retry on connection failure * @return Left(Error Message) or Right(Status Code, Response as UTF-8 String) */ - def post(endpoint: String, body: JsValue, retry: Boolean): Either[ContainerHttpError, ContainerResponse] = { + def post(endpoint: String, body: JsValue, retry: Boolean)( + implicit tid: TransactionId): Either[ContainerHttpError, ContainerResponse] = { val entity = new StringEntity(body.compactPrint, StandardCharsets.UTF_8) entity.setContentType("application/json") @@ -76,12 +80,15 @@ protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxRe request.addHeader(HttpHeaders.ACCEPT, "application/json") request.setEntity(entity) - execute(request, timeout.toMillis.toInt, retry) + execute(request, timeout, retry) } - private def execute(request: HttpRequestBase, - timeoutMsec: Integer, - retry: Boolean): Either[ContainerHttpError, ContainerResponse] = { + // Used internally to wrap all exceptions for which the request can be retried + private case class RetryableConnectionError(t: Throwable) extends Exception(t) with NoStackTrace + + // Annotation will make the compiler complain if no tail recursion is possible + @tailrec private def execute(request: HttpRequestBase, timeout: FiniteDuration, retry: Boolean)( + implicit tid: TransactionId): Either[ContainerHttpError, ContainerResponse] = { Try(connection.execute(request)).map { response => val containerResponse = Option(response.getEntity) .map { entity => @@ -105,15 +112,29 @@ protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxRe response.close() containerResponse + } recoverWith { + // The route to target socket as well as the target socket itself may need some time to be available - + // particularly on a loaded system. + // The following exceptions occur on such transient conditions. In addition, no data has been transmitted + // yet if these exceptions occur. For this reason, it is safe and reasonable to retry. + // + // HttpHostConnectException: no target socket is listening (yet). + case t: HttpHostConnectException => Failure(RetryableConnectionError(t)) + // + // NoRouteToHostException: route to target host is not known (yet). + case t: NoRouteToHostException => Failure(RetryableConnectionError(t)) } match { - case Success(r) => r - case Failure(t: HttpHostConnectException) if retry => - if (timeoutMsec > 0) { - Thread sleep 100 - val newTimeout = timeoutMsec - 100 - execute(request, newTimeout, retry) + case Success(response) => response + case Failure(t: RetryableConnectionError) if retry => + val sleepTime = 10.milliseconds + if (timeout > Duration.Zero) { + logging.info(this, s"POST failed with ${t} - retrying after sleeping ${sleepTime}.") + Thread.sleep(sleepTime.toMillis) + val newTimeout = timeout - sleepTime + execute(request, newTimeout, retry = true) } else { - Left(Timeout()) + logging.warn(this, s"POST failed with ${t} - no retry because timeout exceeded.") + Left(Timeout(t)) } case Failure(t: Throwable) => Left(ConnectionError(t)) } @@ -141,14 +162,15 @@ protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, maxRe object HttpUtils { /** A helper method to post one single request to a connection. Used for container tests. */ - def post(host: String, port: Int, endPoint: String, content: JsValue): (Int, Option[JsObject]) = { + def post(host: String, port: Int, endPoint: String, content: JsValue)(implicit logging: Logging, + tid: TransactionId): (Int, Option[JsObject]) = { val connection = new HttpUtils(s"$host:$port", 90.seconds, 1.MB) val response = connection.post(endPoint, content, retry = true) connection.close() response match { case Right(r) => (r.statusCode, Try(r.entity.parseJson.asJsObject).toOption) case Left(NoResponseReceived()) => throw new IllegalStateException("no response from container") - case Left(Timeout()) => throw new java.util.concurrent.TimeoutException() + case Left(Timeout(_)) => throw new java.util.concurrent.TimeoutException() case Left(ConnectionError(t: java.net.SocketTimeoutException)) => throw new java.util.concurrent.TimeoutException() case Left(ConnectionError(t)) => throw new IllegalStateException(t.getMessage) diff --git a/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala b/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala index 85a9f36..5f8c815 100644 --- a/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala +++ b/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala @@ -99,7 +99,7 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol { protected[core] sealed trait ContainerHttpError extends ContainerConnectionError protected[core] case class ConnectionError(t: Throwable) extends ContainerHttpError protected[core] case class NoResponseReceived() extends ContainerHttpError - protected[core] case class Timeout() extends ContainerHttpError + protected[core] case class Timeout(t: Throwable) extends ContainerHttpError protected[core] case class MemoryExhausted() extends ContainerConnectionError diff --git a/tests/src/test/scala/actionContainers/ActionContainer.scala b/tests/src/test/scala/actionContainers/ActionContainer.scala index 3ee1f47..97915c8 100644 --- a/tests/src/test/scala/actionContainers/ActionContainer.scala +++ b/tests/src/test/scala/actionContainers/ActionContainer.scala @@ -31,13 +31,13 @@ import scala.sys.process.ProcessLogger import scala.sys.process.stringToProcess import scala.util.Random import scala.util.{Failure, Success} - import org.apache.commons.lang3.StringUtils -import org.scalatest.FlatSpec -import org.scalatest.Matchers - +import org.scalatest.{FlatSpec, Matchers} import akka.actor.ActorSystem import spray.json._ +import common.StreamLogging +import whisk.common.Logging +import whisk.common.TransactionId import whisk.core.entity.Exec /** @@ -49,7 +49,7 @@ trait ActionContainer { def run(value: JsValue): (Int, Option[JsObject]) } -trait ActionProxyContainerTestUtils extends FlatSpec with Matchers { +trait ActionProxyContainerTestUtils extends FlatSpec with Matchers with StreamLogging { import ActionContainer.{filterSentinel, sentinel} def initPayload(code: String, main: String = "main"): JsObject = @@ -149,8 +149,8 @@ object ActionContainer { val sentinel = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX" def filterSentinel(str: String): String = str.replaceAll(sentinel, "").trim - def withContainer(imageName: String, environment: Map[String, String] = Map.empty)(code: ActionContainer => Unit)( - implicit actorSystem: ActorSystem): (String, String) = { + def withContainer(imageName: String, environment: Map[String, String] = Map.empty)( + code: ActionContainer => Unit)(implicit actorSystem: ActorSystem, logging: Logging): (String, String) = { val rand = { val r = Random.nextInt; if (r < 0) -r else r } val name = imageName.toLowerCase.replaceAll("""[^a-z]""", "") + rand val envArgs = environment.toSeq @@ -204,7 +204,11 @@ object ActionContainer { } } - private def syncPost(host: String, port: Int, endPoint: String, content: JsValue): (Int, Option[JsObject]) = { + private def syncPost(host: String, port: Int, endPoint: String, content: JsValue)( + implicit logging: Logging): (Int, Option[JsObject]) = { + + implicit val transid = TransactionId.testing + whisk.core.containerpool.HttpUtils.post(host, port, endPoint, content) } } diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala index 0e0867a..9575422 100644 --- a/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala @@ -21,7 +21,6 @@ import java.nio.charset.StandardCharsets import java.time.Instant import scala.concurrent.duration._ - import org.apache.http.HttpRequest import org.apache.http.HttpResponse import org.apache.http.entity.StringEntity @@ -34,8 +33,9 @@ import org.scalatest.BeforeAndAfter import org.scalatest.BeforeAndAfterAll import org.scalatest.FlatSpec import org.scalatest.Matchers - import spray.json.JsObject +import common.StreamLogging +import whisk.common.TransactionId import whisk.core.containerpool.HttpUtils import whisk.core.entity.size._ import whisk.core.entity.ActivationResponse._ @@ -44,7 +44,14 @@ import whisk.core.entity.ActivationResponse._ * Unit tests for HttpUtils which communicate with containers. */ @RunWith(classOf[JUnitRunner]) -class ContainerConnectionTests extends FlatSpec with Matchers with BeforeAndAfter with BeforeAndAfterAll { +class ContainerConnectionTests + extends FlatSpec + with Matchers + with BeforeAndAfter + with BeforeAndAfterAll + with StreamLogging { + + implicit val transid = TransactionId.testing var testHang: FiniteDuration = 0.second var testStatusCode: Int = 200 @@ -75,6 +82,7 @@ class ContainerConnectionTests extends FlatSpec with Matchers with BeforeAndAfte testHang = 0.second testStatusCode = 200 testResponse = null + stream.reset() } override def afterAll = { diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala index 25bc0c9..b3aa2b0 100644 --- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala @@ -383,7 +383,7 @@ class DockerContainerTests val interval = intervalOf(initTimeout + 1.nanoseconds) val container = dockerContainer() { - Future.successful(RunResult(interval, Left(Timeout()))) + Future.successful(RunResult(interval, Left(Timeout(new Throwable())))) } val init = container.initialize(JsObject(), initTimeout) @@ -434,7 +434,7 @@ class DockerContainerTests val interval = intervalOf(runTimeout + 1.nanoseconds) val container = dockerContainer() { - Future.successful(RunResult(interval, Left(Timeout()))) + Future.successful(RunResult(interval, Left(Timeout(new Throwable())))) } val runResult = container.run(JsObject(), JsObject(), runTimeout) diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala index fc3ba68..a6e5d20 100644 --- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala @@ -235,7 +235,7 @@ class KubernetesContainerTests val interval = intervalOf(initTimeout + 1.nanoseconds) val container = kubernetesContainer() { - Future.successful(RunResult(interval, Left(Timeout()))) + Future.successful(RunResult(interval, Left(Timeout(new Throwable())))) } val init = container.initialize(JsObject(), initTimeout) @@ -284,7 +284,7 @@ class KubernetesContainerTests val interval = intervalOf(runTimeout + 1.nanoseconds) val container = kubernetesContainer() { - Future.successful(RunResult(interval, Left(Timeout()))) + Future.successful(RunResult(interval, Left(Timeout(new Throwable())))) } val runResult = container.run(JsObject(), JsObject(), runTimeout) diff --git a/tests/src/test/scala/whisk/core/entity/test/ActivationResponseTests.scala b/tests/src/test/scala/whisk/core/entity/test/ActivationResponseTests.scala index 6e38877..78dd296 100644 --- a/tests/src/test/scala/whisk/core/entity/test/ActivationResponseTests.scala +++ b/tests/src/test/scala/whisk/core/entity/test/ActivationResponseTests.scala @@ -58,7 +58,7 @@ class ActivationResponseTests extends FlatSpec with Matchers { } it should "interpret failed init that does not response" in { - Seq(ConnectionError(new Throwable()), NoResponseReceived(), Timeout()) + Seq(ConnectionError(new Throwable()), NoResponseReceived(), Timeout(new Throwable())) .map(Left(_)) .foreach { e => val ar = processInitResponseContent(e, logger) @@ -122,7 +122,7 @@ class ActivationResponseTests extends FlatSpec with Matchers { } it should "interpret failed run that does not response" in { - Seq(ConnectionError(new Throwable()), NoResponseReceived(), Timeout()) + Seq(ConnectionError(new Throwable()), NoResponseReceived(), Timeout(new Throwable())) .map(Left(_)) .foreach { e => val ar = processRunResponseContent(e, logger) -- To stop receiving notification emails like this one, please contact markusthoem...@apache.org.