This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 369517b Factor PoolingRestClient out of CouchDbRestClient. (#3347) 369517b is described below commit 369517b369453366c3aeb095fba79d1c4411f8ea Author: James Dubee <jwdu...@us.ibm.com> AuthorDate: Tue Feb 27 14:00:35 2018 -0500 Factor PoolingRestClient out of CouchDbRestClient. (#3347) --- .../whisk/core/database/CloudantRestClient.scala | 2 +- .../whisk/core/database/CouchDbRestClient.scala | 160 +++------------------ .../main/scala/whisk/http/PoolingRestClient.scala | 154 ++++++++++++++++++++ .../database/test/ExtendedCouchDbRestClient.scala | 11 +- 4 files changed, 183 insertions(+), 144 deletions(-) diff --git a/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala b/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala index 29ec17e..682df26 100644 --- a/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala +++ b/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala @@ -37,6 +37,6 @@ class CloudantRestClient(host: String, port: Int, username: String, password: St // https://cloudant.com/blog/cloudant-query-grows-up-to-handle-ad-hoc-queries/#.VvllCD-0z2C def simpleQuery(doc: JsObject): Future[Either[StatusCode, JsObject]] = { - requestJson[JsObject](mkJsonRequest(HttpMethods.POST, uri(db, "_find"), doc)) + requestJson[JsObject](mkJsonRequest(HttpMethods.POST, uri(db, "_find"), doc, baseHeaders)) } } diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala index 0f791b2..e38ce6f 100644 --- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala +++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala @@ -17,28 +17,22 @@ package whisk.core.database +import scala.concurrent.Future + import java.net.URLEncoder import java.nio.charset.StandardCharsets -import scala.concurrent.Future -import scala.concurrent.Promise -import scala.util.{Failure, Success} - import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.http.scaladsl.Http.HostConnectionPool -import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ -import akka.http.scaladsl.marshalling._ import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers._ -import akka.http.scaladsl.unmarshalling._ -import akka.stream.ActorMaterializer -import akka.stream.OverflowStrategy -import akka.stream.QueueOfferResult import akka.stream.scaladsl._ import akka.util.ByteString + import spray.json._ +import spray.json.DefaultJsonProtocol._ + import whisk.common.Logging +import whisk.http.PoolingRestClient /** * This class only handles the basic communication to the proper endpoints @@ -50,35 +44,14 @@ import whisk.common.Logging */ class CouchDbRestClient(protocol: String, host: String, port: Int, username: String, password: String, db: String)( implicit system: ActorSystem, - logging: Logging) { - require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.") - - private implicit val context = system.dispatcher - private implicit val materializer = ActorMaterializer() + logging: Logging) + extends PoolingRestClient(protocol, host, port, 16 * 1024) { - // Creates or retrieves a connection pool for the host. - private val pool = if (protocol == "http") { - Http().cachedHostConnectionPool[Promise[HttpResponse]](host = host, port = port) - } else { - Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host = host, port = port) - } - - private val poolPromise = Promise[HostConnectionPool] + // Headers common to all requests. + val baseHeaders: List[HttpHeader] = + List(Authorization(BasicHttpCredentials(username, password)), Accept(MediaTypes.`application/json`)) - // Additional queue in case all connections are busy. Should hardly ever be - // filled in practice but can be useful, e.g., in tests starting many - // asynchronous requests in a very short period of time. - private val QUEUE_SIZE = 16 * 1024; - private val requestQueue = Source - .queue(QUEUE_SIZE, OverflowStrategy.dropNew) - .via(pool.mapMaterializedValue { x => - poolPromise.success(x); x - }) - .toMat(Sink.foreach({ - case ((Success(response), p)) => p.success(response) - case ((Failure(error), p)) => p.failure(error) - }))(Keep.left) - .run + def revHeader(forRev: String) = List(`If-Match`(EntityTagRange(EntityTag(forRev)))) // Properly encodes the potential slashes in each segment. protected def uri(segments: Any*): Uri = { @@ -86,103 +59,30 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str Uri(s"/${encodedSegments.mkString("/")}") } - // Headers common to all requests. - private val baseHeaders = - List(Authorization(BasicHttpCredentials(username, password)), Accept(MediaTypes.`application/json`)) - - // Prepares a request with the proper headers. - private def mkRequest0(method: HttpMethod, - uri: Uri, - body: Future[MessageEntity], - forRev: Option[String] = None): Future[HttpRequest] = { - val revHeader = forRev.map(r => `If-Match`(EntityTagRange(EntityTag(r)))).toList - val headers = revHeader ::: baseHeaders - body.map { b => - HttpRequest(method = method, uri = uri, headers = headers, entity = b) - } - } - - protected def mkRequest(method: HttpMethod, uri: Uri, forRev: Option[String] = None): Future[HttpRequest] = { - mkRequest0(method, uri, Future.successful(HttpEntity.Empty), forRev = forRev) - } - - protected def mkJsonRequest(method: HttpMethod, - uri: Uri, - body: JsValue, - forRev: Option[String] = None): Future[HttpRequest] = { - val b = Marshal(body).to[MessageEntity] - mkRequest0(method, uri, b, forRev = forRev) - } - - // Enqueue a request, and return a future capturing the corresponding response. - // WARNING: make sure that if the future response is not failed, its entity - // be drained entirely or the connection will be kept open until timeouts kick in. - private def request0(futureRequest: Future[HttpRequest]): Future[HttpResponse] = { - futureRequest flatMap { request => - val promise = Promise[HttpResponse] - - // When the future completes, we know whether the request made it - // through the queue. - requestQueue.offer(request -> promise).flatMap { buffered => - buffered match { - case QueueOfferResult.Enqueued => - promise.future - - case QueueOfferResult.Dropped => - Future.failed(new Exception("DB request queue is full.")) - - case QueueOfferResult.QueueClosed => - Future.failed(new Exception("DB request queue was closed.")) - - case QueueOfferResult.Failure(f) => - Future.failed(f) - } - } - } - } - - // Runs a request and returns either a JsObject, or a StatusCode if not 2xx. - protected def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]): Future[Either[StatusCode, T]] = { - request0(futureRequest) flatMap { response => - if (response.status.isSuccess()) { - Unmarshal(response.entity.withoutSizeLimit()).to[T].map { o => - Right(o) - } - } else { - // This is important, as it drains the entity stream. - // Otherwise the connection stays open and the pool dries up. - response.entity.withoutSizeLimit().dataBytes.runWith(Sink.ignore).map { _ => - Left(response.status) - } - } - } - } - - import spray.json.DefaultJsonProtocol._ - // http://docs.couchdb.org/en/1.6.1/api/document/common.html#put--db-docid def putDoc(id: String, doc: JsObject): Future[Either[StatusCode, JsObject]] = - requestJson[JsObject](mkJsonRequest(HttpMethods.PUT, uri(db, id), doc)) + requestJson[JsObject](mkJsonRequest(HttpMethods.PUT, uri(db, id), doc, baseHeaders)) // http://docs.couchdb.org/en/1.6.1/api/document/common.html#put--db-docid def putDoc(id: String, rev: String, doc: JsObject): Future[Either[StatusCode, JsObject]] = - requestJson[JsObject](mkJsonRequest(HttpMethods.PUT, uri(db, id), doc, forRev = Some(rev))) + requestJson[JsObject](mkJsonRequest(HttpMethods.PUT, uri(db, id), doc, baseHeaders ++ revHeader(rev))) // http://docs.couchdb.org/en/2.1.0/api/database/bulk-api.html#inserting-documents-in-bulk def putDocs(docs: Seq[JsObject]): Future[Either[StatusCode, JsArray]] = - requestJson[JsArray](mkJsonRequest(HttpMethods.POST, uri(db, "_bulk_docs"), JsObject("docs" -> docs.toJson))) + requestJson[JsArray]( + mkJsonRequest(HttpMethods.POST, uri(db, "_bulk_docs"), JsObject("docs" -> docs.toJson), baseHeaders)) // http://docs.couchdb.org/en/1.6.1/api/document/common.html#get--db-docid def getDoc(id: String): Future[Either[StatusCode, JsObject]] = - requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id))) + requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), baseHeaders)) // http://docs.couchdb.org/en/1.6.1/api/document/common.html#get--db-docid def getDoc(id: String, rev: String): Future[Either[StatusCode, JsObject]] = - requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), forRev = Some(rev))) + requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), baseHeaders ++ revHeader(rev))) // http://docs.couchdb.org/en/1.6.1/api/document/common.html#delete--db-docid def deleteDoc(id: String, rev: String): Future[Either[StatusCode, JsObject]] = - requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db, id), forRev = Some(rev))) + requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db, id), baseHeaders ++ revHeader(rev))) // http://docs.couchdb.org/en/1.6.1/api/ddoc/views.html def executeView(designDoc: String, viewName: String)(startKey: List[Any] = Nil, @@ -238,7 +138,7 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str val viewUri = uri(db, "_design", designDoc, "_view", viewName).withQuery(Uri.Query(argMap)) - requestJson[JsObject](mkRequest(HttpMethods.GET, viewUri)) + requestJson[JsObject](mkRequest(HttpMethods.GET, viewUri, baseHeaders)) } // Streams an attachment to the database @@ -249,7 +149,8 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str contentType: ContentType, source: Source[ByteString, _]): Future[Either[StatusCode, JsObject]] = { val entity = HttpEntity.Chunked(contentType, source.map(bs => HttpEntity.ChunkStreamPart(bs))) - val request = mkRequest0(HttpMethods.PUT, uri(db, id, attName), Future.successful(entity), forRev = Some(rev)) + val request = + mkRequest0(HttpMethods.PUT, uri(db, id, attName), Future.successful(entity), baseHeaders ++ revHeader(rev)) requestJson[JsObject](request) } @@ -259,7 +160,7 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str rev: String, attName: String, sink: Sink[ByteString, Future[T]]): Future[Either[StatusCode, (ContentType, T)]] = { - val request = mkRequest(HttpMethods.GET, uri(db, id, attName), forRev = Some(rev)) + val request = mkRequest(HttpMethods.GET, uri(db, id, attName), baseHeaders ++ revHeader(rev)) request0(request) flatMap { response => if (response.status.isSuccess()) { @@ -269,19 +170,4 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str } } } - - def shutdown(): Future[Unit] = { - materializer.shutdown() - // The code below shuts down the pool, but is apparently not tolerant - // to multiple clients shutting down the same pool (the second one just - // hangs). Given that shutdown is only relevant for tests (unused pools - // close themselves anyway after some time) and that they can call - // Http().shutdownAllConnectionPools(), this is not a major issue. - /* Reintroduce below if they ever make HostConnectionPool.shutdown() - * safe to call >1x. - * val poolOpt = poolPromise.future.value.map(_.toOption).flatten - * poolOpt.map(_.shutdown().map(_ => ())).getOrElse(Future.successful(())) - */ - Future.successful(()) - } } diff --git a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala new file mode 100644 index 0000000..ef09770 --- /dev/null +++ b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.http + +import scala.concurrent.Future +import scala.concurrent.Promise +import scala.util.{Failure, Success} + +import akka.actor.ActorSystem +import akka.http.scaladsl.Http +import akka.http.scaladsl.Http.HostConnectionPool +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ +import akka.http.scaladsl.marshalling._ +import akka.http.scaladsl.model._ +import akka.http.scaladsl.unmarshalling._ +import akka.stream.ActorMaterializer +import akka.stream.OverflowStrategy +import akka.stream.QueueOfferResult +import akka.stream.scaladsl._ + +import spray.json._ + +/** + * This class only handles the basic communication to the proper endpoints. + * It is up to its clients to interpret the results. It is built on akka-http + * host-level connection pools; compared to single requests, it saves some time + * on each request because it doesn't need to look up the pool corresponding + * to the host. It is also easier to add an extra queueing mechanism. + */ +class PoolingRestClient(protocol: String, host: String, port: Int, queueSize: Int)(implicit system: ActorSystem) { + require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.") + + implicit val context = system.dispatcher + implicit val materializer = ActorMaterializer() + + // Creates or retrieves a connection pool for the host. + private val pool = if (protocol == "http") { + Http().cachedHostConnectionPool[Promise[HttpResponse]](host = host, port = port) + } else { + Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host = host, port = port) + } + + private val poolPromise = Promise[HostConnectionPool] + + // Additional queue in case all connections are busy. Should hardly ever be + // filled in practice but can be useful, e.g., in tests starting many + // asynchronous requests in a very short period of time. + private val requestQueue = Source + .queue(queueSize, OverflowStrategy.dropNew) + .via(pool.mapMaterializedValue { x => + poolPromise.success(x); x + }) + .toMat(Sink.foreach({ + case ((Success(response), p)) => p.success(response) + case ((Failure(error), p)) => p.failure(error) + }))(Keep.left) + .run + + // Prepares a request with the proper headers. + def mkRequest0(method: HttpMethod, + uri: Uri, + body: Future[MessageEntity], + headers: List[HttpHeader] = List.empty): Future[HttpRequest] = { + body.map { b => + HttpRequest(method, uri, headers, b) + } + } + + protected def mkRequest(method: HttpMethod, uri: Uri, headers: List[HttpHeader] = List.empty): Future[HttpRequest] = { + mkRequest0(method, uri, Future.successful(HttpEntity.Empty), headers) + } + + protected def mkJsonRequest(method: HttpMethod, + uri: Uri, + body: JsValue, + headers: List[HttpHeader] = List.empty): Future[HttpRequest] = { + val b = Marshal(body).to[MessageEntity] + mkRequest0(method, uri, b, headers) + } + + // Enqueue a request, and return a future capturing the corresponding response. + // WARNING: make sure that if the future response is not failed, its entity + // be drained entirely or the connection will be kept open until timeouts kick in. + def request0(futureRequest: Future[HttpRequest]): Future[HttpResponse] = { + futureRequest flatMap { request => + val promise = Promise[HttpResponse] + + // When the future completes, we know whether the request made it + // through the queue. + requestQueue.offer(request -> promise).flatMap { buffered => + buffered match { + case QueueOfferResult.Enqueued => + promise.future + + case QueueOfferResult.Dropped => + Future.failed(new Exception("DB request queue is full.")) + + case QueueOfferResult.QueueClosed => + Future.failed(new Exception("DB request queue was closed.")) + + case QueueOfferResult.Failure(f) => + Future.failed(f) + } + } + } + } + + // Runs a request and returns either a JsObject, or a StatusCode if not 2xx. + protected def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]): Future[Either[StatusCode, T]] = { + request0(futureRequest) flatMap { response => + if (response.status.isSuccess()) { + Unmarshal(response.entity.withoutSizeLimit()).to[T].map { o => + Right(o) + } + } else { + // This is important, as it drains the entity stream. + // Otherwise the connection stays open and the pool dries up. + response.entity.withoutSizeLimit().dataBytes.runWith(Sink.ignore).map { _ => + Left(response.status) + } + } + } + } + + def shutdown(): Future[Unit] = { + materializer.shutdown() + // The code below shuts down the pool, but is apparently not tolerant + // to multiple clients shutting down the same pool (the second one just + // hangs). Given that shutdown is only relevant for tests (unused pools + // close themselves anyway after some time) and that they can call + // Http().shutdownAllConnectionPools(), this is not a major issue. + /* Reintroduce below if they ever make HostConnectionPool.shutdown() + * safe to call >1x. + * val poolOpt = poolPromise.future.value.map(_.toOption).flatten + * poolOpt.map(_.shutdown().map(_ => ())).getOrElse(Future.successful(())) + */ + Future.successful(()) + } +} diff --git a/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala b/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala index 3b6219b..d554e31 100644 --- a/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala +++ b/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala @@ -39,23 +39,22 @@ class ExtendedCouchDbRestClient(protocol: String, // http://docs.couchdb.org/en/1.6.1/api/server/common.html#get-- def instanceInfo(): Future[Either[StatusCode, JsObject]] = - requestJson[JsObject](mkRequest(HttpMethods.GET, Uri./)) + requestJson[JsObject](mkRequest(HttpMethods.GET, Uri./, baseHeaders)) // http://docs.couchdb.org/en/1.6.1/api/server/common.html#all-dbs def dbs(): Future[Either[StatusCode, List[String]]] = { - implicit val ec = system.dispatcher - requestJson[JsArray](mkRequest(HttpMethods.GET, uri("_all_dbs"))).map { either => + requestJson[JsArray](mkRequest(HttpMethods.GET, uri("_all_dbs"), baseHeaders)).map { either => either.right.map(_.convertTo[List[String]]) } } // http://docs.couchdb.org/en/1.6.1/api/database/common.html#put--db def createDb(): Future[Either[StatusCode, JsObject]] = - requestJson[JsObject](mkRequest(HttpMethods.PUT, uri(db))) + requestJson[JsObject](mkRequest(HttpMethods.PUT, uri(db), baseHeaders)) // http://docs.couchdb.org/en/1.6.1/api/database/common.html#delete--db def deleteDb(): Future[Either[StatusCode, JsObject]] = - requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db))) + requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db), baseHeaders)) // http://docs.couchdb.org/en/1.6.1/api/database/bulk-api.html#get--db-_all_docs def getAllDocs(skip: Option[Int] = None, @@ -76,6 +75,6 @@ class ExtendedCouchDbRestClient(protocol: String, .toMap val url = uri(db, "_all_docs").withQuery(Uri.Query(argMap)) - requestJson[JsObject](mkRequest(HttpMethods.GET, url)) + requestJson[JsObject](mkRequest(HttpMethods.GET, url, baseHeaders)) } } -- To stop receiving notification emails like this one, please contact markusthoem...@apache.org.