This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d22b99  Make REST communication with action containers more robust. 
(#3710)
0d22b99 is described below

commit 0d22b9976e22bbe37bd77ba5afd3836870870d52
Author: Sven Lange-Last <sven.lange-l...@de.ibm.com>
AuthorDate: Tue May 29 15:49:52 2018 +0200

    Make REST communication with action containers more robust. (#3710)
    
    On systems with high load, POSTing the action container `/init` endpoint 
occasionally fails with `NoRouteToHostException`. Retry if this exception 
occurs.
---
 .../scala/whisk/core/containerpool/HttpUtils.scala | 60 +++++++++++++++-------
 .../scala/whisk/core/entity/ActivationResult.scala |  2 +-
 .../scala/actionContainers/ActionContainer.scala   | 20 +++++---
 .../docker/test/ContainerConnectionTests.scala     | 14 +++--
 .../docker/test/DockerContainerTests.scala         |  4 +-
 .../kubernetes/test/KubernetesContainerTests.scala |  4 +-
 .../core/entity/test/ActivationResponseTests.scala |  4 +-
 7 files changed, 71 insertions(+), 37 deletions(-)

diff --git 
a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala 
b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
index e0fd37f..07c6fc7 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
@@ -17,14 +17,15 @@
 
 package whisk.core.containerpool
 
+import java.net.NoRouteToHostException
 import java.nio.charset.StandardCharsets
 
-import scala.concurrent.duration.DurationInt
-import scala.concurrent.duration.FiniteDuration
+import scala.annotation.tailrec
+import scala.concurrent.duration._
+import scala.util.control.NoStackTrace
 import scala.util.Failure
 import scala.util.Success
 import scala.util.Try
-
 import org.apache.commons.io.IOUtils
 import org.apache.http.HttpHeaders
 import org.apache.http.client.config.RequestConfig
@@ -34,8 +35,9 @@ import org.apache.http.client.utils.URIBuilder
 import org.apache.http.conn.HttpHostConnectException
 import org.apache.http.entity.StringEntity
 import org.apache.http.impl.client.HttpClientBuilder
-
 import spray.json._
+import whisk.common.Logging
+import whisk.common.TransactionId
 import whisk.core.entity.ActivationResponse._
 import whisk.core.entity.ByteSize
 import whisk.core.entity.size.SizeLong
@@ -52,7 +54,8 @@ import whisk.core.entity.size.SizeLong
  * @param timeout the timeout in msecs to wait for a response
  * @param maxResponse the maximum size in bytes the connection will accept
  */
-protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, 
maxResponse: ByteSize) {
+protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, 
maxResponse: ByteSize)(
+  implicit logging: Logging) {
 
   def close() = Try(connection.close())
 
@@ -68,7 +71,8 @@ protected[core] class HttpUtils(hostname: String, timeout: 
FiniteDuration, maxRe
    * @param retry whether or not to retry on connection failure
    * @return Left(Error Message) or Right(Status Code, Response as UTF-8 
String)
    */
-  def post(endpoint: String, body: JsValue, retry: Boolean): 
Either[ContainerHttpError, ContainerResponse] = {
+  def post(endpoint: String, body: JsValue, retry: Boolean)(
+    implicit tid: TransactionId): Either[ContainerHttpError, 
ContainerResponse] = {
     val entity = new StringEntity(body.compactPrint, StandardCharsets.UTF_8)
     entity.setContentType("application/json")
 
@@ -76,12 +80,15 @@ protected[core] class HttpUtils(hostname: String, timeout: 
FiniteDuration, maxRe
     request.addHeader(HttpHeaders.ACCEPT, "application/json")
     request.setEntity(entity)
 
-    execute(request, timeout.toMillis.toInt, retry)
+    execute(request, timeout, retry)
   }
 
-  private def execute(request: HttpRequestBase,
-                      timeoutMsec: Integer,
-                      retry: Boolean): Either[ContainerHttpError, 
ContainerResponse] = {
+  // Used internally to wrap all exceptions for which the request can be 
retried
+  private case class RetryableConnectionError(t: Throwable) extends 
Exception(t) with NoStackTrace
+
+  // Annotation will make the compiler complain if no tail recursion is 
possible
+  @tailrec private def execute(request: HttpRequestBase, timeout: 
FiniteDuration, retry: Boolean)(
+    implicit tid: TransactionId): Either[ContainerHttpError, 
ContainerResponse] = {
     Try(connection.execute(request)).map { response =>
       val containerResponse = Option(response.getEntity)
         .map { entity =>
@@ -105,15 +112,29 @@ protected[core] class HttpUtils(hostname: String, 
timeout: FiniteDuration, maxRe
 
       response.close()
       containerResponse
+    } recoverWith {
+      // The route to target socket as well as the target socket itself may 
need some time to be available -
+      // particularly on a loaded system.
+      // The following exceptions occur on such transient conditions. In 
addition, no data has been transmitted
+      // yet if these exceptions occur. For this reason, it is safe and 
reasonable to retry.
+      //
+      // HttpHostConnectException: no target socket is listening (yet).
+      case t: HttpHostConnectException => Failure(RetryableConnectionError(t))
+      //
+      // NoRouteToHostException: route to target host is not known (yet).
+      case t: NoRouteToHostException => Failure(RetryableConnectionError(t))
     } match {
-      case Success(r) => r
-      case Failure(t: HttpHostConnectException) if retry =>
-        if (timeoutMsec > 0) {
-          Thread sleep 100
-          val newTimeout = timeoutMsec - 100
-          execute(request, newTimeout, retry)
+      case Success(response) => response
+      case Failure(t: RetryableConnectionError) if retry =>
+        val sleepTime = 10.milliseconds
+        if (timeout > Duration.Zero) {
+          logging.info(this, s"POST failed with ${t} - retrying after sleeping 
${sleepTime}.")
+          Thread.sleep(sleepTime.toMillis)
+          val newTimeout = timeout - sleepTime
+          execute(request, newTimeout, retry = true)
         } else {
-          Left(Timeout())
+          logging.warn(this, s"POST failed with ${t} - no retry because 
timeout exceeded.")
+          Left(Timeout(t))
         }
       case Failure(t: Throwable) => Left(ConnectionError(t))
     }
@@ -141,14 +162,15 @@ protected[core] class HttpUtils(hostname: String, 
timeout: FiniteDuration, maxRe
 object HttpUtils {
 
   /** A helper method to post one single request to a connection. Used for 
container tests. */
-  def post(host: String, port: Int, endPoint: String, content: JsValue): (Int, 
Option[JsObject]) = {
+  def post(host: String, port: Int, endPoint: String, content: 
JsValue)(implicit logging: Logging,
+                                                                        tid: 
TransactionId): (Int, Option[JsObject]) = {
     val connection = new HttpUtils(s"$host:$port", 90.seconds, 1.MB)
     val response = connection.post(endPoint, content, retry = true)
     connection.close()
     response match {
       case Right(r)                   => (r.statusCode, 
Try(r.entity.parseJson.asJsObject).toOption)
       case Left(NoResponseReceived()) => throw new IllegalStateException("no 
response from container")
-      case Left(Timeout())            => throw new 
java.util.concurrent.TimeoutException()
+      case Left(Timeout(_))           => throw new 
java.util.concurrent.TimeoutException()
       case Left(ConnectionError(t: java.net.SocketTimeoutException)) =>
         throw new java.util.concurrent.TimeoutException()
       case Left(ConnectionError(t)) => throw new 
IllegalStateException(t.getMessage)
diff --git 
a/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala 
b/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
index 85a9f36..5f8c815 100644
--- a/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
@@ -99,7 +99,7 @@ protected[core] object ActivationResponse extends 
DefaultJsonProtocol {
   protected[core] sealed trait ContainerHttpError extends 
ContainerConnectionError
   protected[core] case class ConnectionError(t: Throwable) extends 
ContainerHttpError
   protected[core] case class NoResponseReceived() extends ContainerHttpError
-  protected[core] case class Timeout() extends ContainerHttpError
+  protected[core] case class Timeout(t: Throwable) extends ContainerHttpError
 
   protected[core] case class MemoryExhausted() extends ContainerConnectionError
 
diff --git a/tests/src/test/scala/actionContainers/ActionContainer.scala 
b/tests/src/test/scala/actionContainers/ActionContainer.scala
index 3ee1f47..97915c8 100644
--- a/tests/src/test/scala/actionContainers/ActionContainer.scala
+++ b/tests/src/test/scala/actionContainers/ActionContainer.scala
@@ -31,13 +31,13 @@ import scala.sys.process.ProcessLogger
 import scala.sys.process.stringToProcess
 import scala.util.Random
 import scala.util.{Failure, Success}
-
 import org.apache.commons.lang3.StringUtils
-import org.scalatest.FlatSpec
-import org.scalatest.Matchers
-
+import org.scalatest.{FlatSpec, Matchers}
 import akka.actor.ActorSystem
 import spray.json._
+import common.StreamLogging
+import whisk.common.Logging
+import whisk.common.TransactionId
 import whisk.core.entity.Exec
 
 /**
@@ -49,7 +49,7 @@ trait ActionContainer {
   def run(value: JsValue): (Int, Option[JsObject])
 }
 
-trait ActionProxyContainerTestUtils extends FlatSpec with Matchers {
+trait ActionProxyContainerTestUtils extends FlatSpec with Matchers with 
StreamLogging {
   import ActionContainer.{filterSentinel, sentinel}
 
   def initPayload(code: String, main: String = "main"): JsObject =
@@ -149,8 +149,8 @@ object ActionContainer {
   val sentinel = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX"
   def filterSentinel(str: String): String = str.replaceAll(sentinel, "").trim
 
-  def withContainer(imageName: String, environment: Map[String, String] = 
Map.empty)(code: ActionContainer => Unit)(
-    implicit actorSystem: ActorSystem): (String, String) = {
+  def withContainer(imageName: String, environment: Map[String, String] = 
Map.empty)(
+    code: ActionContainer => Unit)(implicit actorSystem: ActorSystem, logging: 
Logging): (String, String) = {
     val rand = { val r = Random.nextInt; if (r < 0) -r else r }
     val name = imageName.toLowerCase.replaceAll("""[^a-z]""", "") + rand
     val envArgs = environment.toSeq
@@ -204,7 +204,11 @@ object ActionContainer {
     }
   }
 
-  private def syncPost(host: String, port: Int, endPoint: String, content: 
JsValue): (Int, Option[JsObject]) = {
+  private def syncPost(host: String, port: Int, endPoint: String, content: 
JsValue)(
+    implicit logging: Logging): (Int, Option[JsObject]) = {
+
+    implicit val transid = TransactionId.testing
+
     whisk.core.containerpool.HttpUtils.post(host, port, endPoint, content)
   }
 }
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
 
b/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
index 0e0867a..9575422 100644
--- 
a/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
+++ 
b/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
@@ -21,7 +21,6 @@ import java.nio.charset.StandardCharsets
 import java.time.Instant
 
 import scala.concurrent.duration._
-
 import org.apache.http.HttpRequest
 import org.apache.http.HttpResponse
 import org.apache.http.entity.StringEntity
@@ -34,8 +33,9 @@ import org.scalatest.BeforeAndAfter
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FlatSpec
 import org.scalatest.Matchers
-
 import spray.json.JsObject
+import common.StreamLogging
+import whisk.common.TransactionId
 import whisk.core.containerpool.HttpUtils
 import whisk.core.entity.size._
 import whisk.core.entity.ActivationResponse._
@@ -44,7 +44,14 @@ import whisk.core.entity.ActivationResponse._
  * Unit tests for HttpUtils which communicate with containers.
  */
 @RunWith(classOf[JUnitRunner])
-class ContainerConnectionTests extends FlatSpec with Matchers with 
BeforeAndAfter with BeforeAndAfterAll {
+class ContainerConnectionTests
+    extends FlatSpec
+    with Matchers
+    with BeforeAndAfter
+    with BeforeAndAfterAll
+    with StreamLogging {
+
+  implicit val transid = TransactionId.testing
 
   var testHang: FiniteDuration = 0.second
   var testStatusCode: Int = 200
@@ -75,6 +82,7 @@ class ContainerConnectionTests extends FlatSpec with Matchers 
with BeforeAndAfte
     testHang = 0.second
     testStatusCode = 200
     testResponse = null
+    stream.reset()
   }
 
   override def afterAll = {
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
 
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
index 25bc0c9..b3aa2b0 100644
--- 
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ 
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -383,7 +383,7 @@ class DockerContainerTests
     val interval = intervalOf(initTimeout + 1.nanoseconds)
 
     val container = dockerContainer() {
-      Future.successful(RunResult(interval, Left(Timeout())))
+      Future.successful(RunResult(interval, Left(Timeout(new Throwable()))))
     }
 
     val init = container.initialize(JsObject(), initTimeout)
@@ -434,7 +434,7 @@ class DockerContainerTests
     val interval = intervalOf(runTimeout + 1.nanoseconds)
 
     val container = dockerContainer() {
-      Future.successful(RunResult(interval, Left(Timeout())))
+      Future.successful(RunResult(interval, Left(Timeout(new Throwable()))))
     }
 
     val runResult = container.run(JsObject(), JsObject(), runTimeout)
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
 
b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index fc3ba68..a6e5d20 100644
--- 
a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++ 
b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -235,7 +235,7 @@ class KubernetesContainerTests
     val interval = intervalOf(initTimeout + 1.nanoseconds)
 
     val container = kubernetesContainer() {
-      Future.successful(RunResult(interval, Left(Timeout())))
+      Future.successful(RunResult(interval, Left(Timeout(new Throwable()))))
     }
 
     val init = container.initialize(JsObject(), initTimeout)
@@ -284,7 +284,7 @@ class KubernetesContainerTests
     val interval = intervalOf(runTimeout + 1.nanoseconds)
 
     val container = kubernetesContainer() {
-      Future.successful(RunResult(interval, Left(Timeout())))
+      Future.successful(RunResult(interval, Left(Timeout(new Throwable()))))
     }
 
     val runResult = container.run(JsObject(), JsObject(), runTimeout)
diff --git 
a/tests/src/test/scala/whisk/core/entity/test/ActivationResponseTests.scala 
b/tests/src/test/scala/whisk/core/entity/test/ActivationResponseTests.scala
index 6e38877..78dd296 100644
--- a/tests/src/test/scala/whisk/core/entity/test/ActivationResponseTests.scala
+++ b/tests/src/test/scala/whisk/core/entity/test/ActivationResponseTests.scala
@@ -58,7 +58,7 @@ class ActivationResponseTests extends FlatSpec with Matchers {
   }
 
   it should "interpret failed init that does not response" in {
-    Seq(ConnectionError(new Throwable()), NoResponseReceived(), Timeout())
+    Seq(ConnectionError(new Throwable()), NoResponseReceived(), Timeout(new 
Throwable()))
       .map(Left(_))
       .foreach { e =>
         val ar = processInitResponseContent(e, logger)
@@ -122,7 +122,7 @@ class ActivationResponseTests extends FlatSpec with 
Matchers {
   }
 
   it should "interpret failed run that does not response" in {
-    Seq(ConnectionError(new Throwable()), NoResponseReceived(), Timeout())
+    Seq(ConnectionError(new Throwable()), NoResponseReceived(), Timeout(new 
Throwable()))
       .map(Left(_))
       .foreach { e =>
         val ar = processRunResponseContent(e, logger)

-- 
To stop receiving notification emails like this one, please contact
markusthoem...@apache.org.

Reply via email to