This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new aace138 Remove play dependence. (#2438) aace138 is described below commit aace138bb08c49b12ce64ba9dac175f3939c7cf5 Author: rodric rabbah <rod...@gmail.com> AuthorDate: Fri Jul 14 03:43:28 2017 -0400 Remove play dependence. (#2438) --- .../scala/whisk/core/entity/ActivationResult.scala | 18 +++- .../scala/whisk/core/container/HttpUtils.scala | 27 ++++- .../whisk/core/container/WhiskContainer.scala | 2 +- .../containerpool/docker/DockerContainer.scala | 7 +- tests/build.gradle | 1 - .../scala/actionContainers/ActionContainer.scala | 23 +--- tests/src/test/scala/common/AkkaHttpUtils.scala | 116 --------------------- 7 files changed, 44 insertions(+), 150 deletions(-) diff --git a/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala b/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala index 6d78dad..aef644e 100644 --- a/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala +++ b/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala @@ -19,6 +19,7 @@ package whisk.core.entity import scala.util.Try +import spray.http.StatusCodes.OK import spray.json._ import spray.json.DefaultJsonProtocol import whisk.common.Logging @@ -96,12 +97,15 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol { protected[core] case class ConnectionError(t: Throwable) extends ContainerConnectionError protected[core] case class NoResponseReceived() extends ContainerConnectionError protected[core] case class Timeout() extends ContainerConnectionError + /** - * @param okStatus the container response was OK (HTTP 200 status code), anything else is considered an error + * @param statusCode the container HTTP response code (e.g., 200 OK) * @param entity the entity response as string * @param truncated either None to indicate complete entity or Some(actual length, max allowed) */ - protected[core] case class ContainerResponse(okStatus: Boolean, entity: String, truncated: Option[(ByteSize, ByteSize)] = None) { + protected[core] case class ContainerResponse(statusCode: Int, entity: String, truncated: Option[(ByteSize, ByteSize)]) { + /** true iff status code is OK (HTTP 200 status code), anything else is considered an error. **/ + val okStatus = statusCode == OK.intValue val ok = okStatus && truncated.isEmpty override def toString = { val base = if (okStatus) "ok" else "not ok" @@ -110,6 +114,12 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol { } } + protected[core] object ContainerResponse { + def apply(okStatus: Boolean, entity: String, truncated: Option[(ByteSize, ByteSize)] = None): ContainerResponse = { + ContainerResponse(if (okStatus) OK.intValue else 500, entity, truncated) + } + } + /** * Interprets response from container after initialization. This method is only called when the initialization failed. * @@ -149,14 +159,14 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol { */ protected[core] def processRunResponseContent(response: Either[ContainerConnectionError, ContainerResponse], logger: Logging): ActivationResponse = { response match { - case Right(ContainerResponse(okStatus, str, truncated)) => truncated match { + case Right(res @ ContainerResponse(_, str, truncated)) => truncated match { case None => Try { str.parseJson.asJsObject } match { case scala.util.Success(result @ JsObject(fields)) => // If the response is a JSON object container an error field, accept it as the response error. val errorOpt = fields.get(ERROR_FIELD) - if (okStatus) { + if (res.okStatus) { errorOpt map { error => applicationError(error) } getOrElse { diff --git a/core/invoker/src/main/scala/whisk/core/container/HttpUtils.scala b/core/invoker/src/main/scala/whisk/core/container/HttpUtils.scala index e53ddb2..1530d69 100644 --- a/core/invoker/src/main/scala/whisk/core/container/HttpUtils.scala +++ b/core/invoker/src/main/scala/whisk/core/container/HttpUtils.scala @@ -20,6 +20,7 @@ package whisk.core.container import java.nio.charset.StandardCharsets import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.DurationInt import scala.util.Failure import scala.util.Success import scala.util.Try @@ -34,7 +35,8 @@ import org.apache.http.conn.HttpHostConnectException import org.apache.http.entity.StringEntity import org.apache.http.impl.client.HttpClientBuilder -import spray.json.JsObject +import spray.json._ + import whisk.core.entity.ActivationResponse._ import whisk.core.entity.ByteSize import whisk.core.entity.size.SizeLong @@ -56,7 +58,7 @@ protected[core] class HttpUtils( timeout: FiniteDuration, maxResponse: ByteSize) { - def close = Try(connection.close) + def close() = Try(connection.close()) /** * Posts to hostname/endpoint the given JSON object. @@ -66,11 +68,11 @@ protected[core] class HttpUtils( * wait longer than the total timeout (within a small slack allowance). * * @param endpoint the path the api call relative to hostname - * @param body the json object to post + * @param body the JSON value to post (this is usually a JSON objecT) * @param retry whether or not to retry on connection failure * @return Left(Error Message) or Right(Status Code, Response as UTF-8 String) */ - def post(endpoint: String, body: JsObject, retry: Boolean): Either[ContainerConnectionError, ContainerResponse] = { + def post(endpoint: String, body: JsValue, retry: Boolean): Either[ContainerConnectionError, ContainerResponse] = { val entity = new StringEntity(body.compactPrint, StandardCharsets.UTF_8) entity.setContentType("application/json") @@ -92,7 +94,7 @@ protected[core] class HttpUtils( val bytes = IOUtils.toByteArray(entity.getContent, bytesToRead) val str = new String(bytes, StandardCharsets.UTF_8) val truncated = if (contentLength <= maxResponseBytes) None else Some(contentLength.B, maxResponse) - Right(ContainerResponse(statusCode == 200, str, truncated)) + Right(ContainerResponse(statusCode, str, truncated)) } else { Left(NoResponseReceived()) } @@ -135,3 +137,18 @@ protected[core] class HttpUtils( .useSystemProperties() .build } + +object HttpUtils { + /** A helper method to post one single request to a connection. Used for container tests. */ + def post(host: String, port: Int, endPoint: String, content: JsValue): (Int, Option[JsObject]) = { + val connection = new HttpUtils(s"$host:$port", 90.seconds, 1.MB) + val response = connection.post(endPoint, content, retry = true) + connection.close() + response match { + case Right(r) => (r.statusCode, Try(r.entity.parseJson.asJsObject).toOption) + case Left(Timeout()) => throw new java.util.concurrent.TimeoutException() + case Left(ConnectionError(t: java.net.SocketTimeoutException)) => throw new java.util.concurrent.TimeoutException() + case _ => throw new IllegalStateException() + } + } +} diff --git a/core/invoker/src/main/scala/whisk/core/container/WhiskContainer.scala b/core/invoker/src/main/scala/whisk/core/container/WhiskContainer.scala index 4685693..49a4639 100644 --- a/core/invoker/src/main/scala/whisk/core/container/WhiskContainer.scala +++ b/core/invoker/src/main/scala/whisk/core/container/WhiskContainer.scala @@ -120,7 +120,7 @@ class WhiskContainer( * Tear down the container and retrieve the logs. */ def teardown()(implicit transid: TransactionId): String = { - connection.foreach(_.close) + connection.foreach(_.close()) getContainerLogs(containerName).toOption.getOrElse("none") } diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala index 4310f91..869c227 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala @@ -76,7 +76,7 @@ object DockerContainer { case (key, value) => Seq("-e", s"$key=$value") }.flatten - val dnsArgs = dnsServers.map(Seq("--dns",_)).flatten + val dnsArgs = dnsServers.map(Seq("--dns", _)).flatten val args = Seq( "--cap-drop", "NET_RAW", @@ -137,7 +137,10 @@ class DockerContainer(id: ContainerId, ip: ContainerIp)( def suspend()(implicit transid: TransactionId): Future[Unit] = runc.pause(id) def resume()(implicit transid: TransactionId): Future[Unit] = runc.resume(id) - def destroy()(implicit transid: TransactionId): Future[Unit] = docker.rm(id) + def destroy()(implicit transid: TransactionId): Future[Unit] = { + httpConnection.foreach(_.close()) + docker.rm(id) + } def initialize(initializer: JsObject, timeout: FiniteDuration)(implicit transid: TransactionId): Future[Interval] = { val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION_INIT, s"sending initialization to $id $ip") diff --git a/tests/build.gradle b/tests/build.gradle index 3a07e70..b33e2c6 100644 --- a/tests/build.gradle +++ b/tests/build.gradle @@ -52,7 +52,6 @@ dependencies { compile 'com.typesafe.akka:akka-testkit_2.11:2.4.16' compile 'com.google.code.gson:gson:2.3.1' compile 'org.scalamock:scalamock-scalatest-support_2.11:3.4.2' - compile 'com.typesafe.play:play-ws_2.11:2.5.11' compile project(':common:scala') compile project(':core:controller') diff --git a/tests/src/test/scala/actionContainers/ActionContainer.scala b/tests/src/test/scala/actionContainers/ActionContainer.scala index 978114d..e10c792 100644 --- a/tests/src/test/scala/actionContainers/ActionContainer.scala +++ b/tests/src/test/scala/actionContainers/ActionContainer.scala @@ -31,7 +31,6 @@ import scala.language.postfixOps import scala.sys.process.ProcessLogger import scala.sys.process.stringToProcess import scala.util.Random -import scala.util.Try import org.apache.commons.lang3.StringUtils import org.scalatest.FlatSpec @@ -159,26 +158,8 @@ object ActionContainer { } } - private def syncPost(host: String, port: Int, endPoint: String, content: JsValue)( - implicit actorSystem: ActorSystem): (Int, Option[JsObject]) = { - import akka.http.scaladsl.model._ - import akka.http.scaladsl.unmarshalling._ - import akka.stream.ActorMaterializer - import common.AkkaHttpUtils - - implicit val materializer = ActorMaterializer() - - val uri = Uri( - scheme = "http", - authority = Uri.Authority(host = Uri.Host(host), port = port), - path = Uri.Path(endPoint)) - - val f = for ( - response <- AkkaHttpUtils.singleRequest(uri.toString(), content, 90.seconds, retryOnTCPErrors = true); - responseBody <- Unmarshal(response.body).to[String] - ) yield (response.status.intValue, Try(responseBody.parseJson.asJsObject).toOption) - - Await.result(f, 90.seconds) + private def syncPost(host: String, port: Int, endPoint: String, content: JsValue): (Int, Option[JsObject]) = { + whisk.core.container.HttpUtils.post(host, port, endPoint, content) } private class ActionContainerImpl() extends ActionContainer { diff --git a/tests/src/test/scala/common/AkkaHttpUtils.scala b/tests/src/test/scala/common/AkkaHttpUtils.scala deleted file mode 100644 index 46faa69..0000000 --- a/tests/src/test/scala/common/AkkaHttpUtils.scala +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package common - -import java.util.concurrent.TimeoutException - -import akka.actor.ActorSystem -import akka.http.scaladsl.model._ -import akka.stream.ActorMaterializer -import play.api.http.{ContentTypeOf, Writeable} -import play.api.libs.ws.WSResponse -import play.api.libs.ws.ahc.AhcWSClient -import play.api.mvc.Codec -import spray.json.JsValue - -import scala.concurrent.{Await, Future, Promise} -import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration} -import scala.util.Try - -object AkkaHttpUtils { - - // Writable for spray-json is required - implicit def sprayJsonContentType(implicit codec: Codec): ContentTypeOf[JsValue] = { - ContentTypeOf[JsValue](Some(ContentTypes.`application/json`.toString())) - } - implicit def sprayJsonWriteable(implicit codec: Codec): Writeable[JsValue] = { - Writeable(message => codec.encode(message.toString())) - } - - def singleRequestBlocking( - uri: String, - content: JsValue, - timeout: FiniteDuration, - retryOnTCPErrors: Boolean = false, - retryOn4xxErrors: Boolean = false, - retryOn5xxErrors: Boolean = false, - retryInterval: FiniteDuration = 100.milliseconds) - (implicit system: ActorSystem) : Try[WSResponse] = { - - val f = singleRequest( - uri, content, timeout, retryOnTCPErrors, retryOn4xxErrors, retryOn5xxErrors, retryInterval - ) - - // Duration.Inf is not an issue, since singleRequest has a built-in timeout mechanism. - Await.ready(f, Duration.Inf) - - f.value.get - } - - // Makes a request, expects a successful within timeout, retries on selected - // errors until timeout has passed. - def singleRequest( - uri: String, - content: JsValue, - timeout: FiniteDuration, - retryOnTCPErrors: Boolean = false, - retryOn4xxErrors: Boolean = false, - retryOn5xxErrors: Boolean = false, - retryInterval: FiniteDuration = 100.milliseconds) - (implicit system: ActorSystem) : Future[WSResponse] = { - implicit val executionContext = system.dispatcher - implicit val materializer = ActorMaterializer() - val wsClient = AhcWSClient() - - val timeoutException = new TimeoutException(s"Request to ${uri} could not be completed in time.") - - val promise = Promise[WSResponse] - - // Timeout includes all retries. - system.scheduler.scheduleOnce(timeout) { - promise.tryFailure(timeoutException) - } - - def tryOnce() : Unit = if(!promise.isCompleted) { - val f = wsClient.url(uri).withRequestTimeout(timeout).post(content) - f.onSuccess { - case r if r.status >= 400 && r.status < 500 && retryOn4xxErrors => - system.scheduler.scheduleOnce(retryInterval) { tryOnce() } - case r if r.status >= 500 && r.status < 600 && retryOn5xxErrors => - system.scheduler.scheduleOnce(retryInterval) { tryOnce() } - case r => - wsClient.close() - promise.trySuccess(r) - } - - f.onFailure { - case s : java.net.ConnectException if retryOnTCPErrors => - // TCP error (e.g. connection couldn't be opened) - system.scheduler.scheduleOnce(retryInterval) { tryOnce() } - - case t : Throwable => - // Other error. We fail the promise. - promise.tryFailure(t) - } - } - - tryOnce() - - promise.future - } -} -- To stop receiving notification emails like this one, please contact ['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].