merge S2GRAPH-249
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/f41ab569 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/f41ab569 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/f41ab569 Branch: refs/heads/master Commit: f41ab569345e9180e8f66355422bbeb7548f0670 Parents: 1dda954 Author: daewon <dae...@apache.org> Authored: Thu Nov 29 18:55:39 2018 +0900 Committer: daewon <dae...@apache.org> Committed: Thu Nov 29 18:55:39 2018 +0900 ---------------------------------------------------------------------- .../apache/s2graph/http/PlayJsonSupport.scala | 42 ++++++++- .../apache/s2graph/http/S2GraphAdminRoute.scala | 95 ++++++-------------- .../s2graph/http/S2GraphMutateRoute.scala | 55 ++++-------- .../apache/s2graph/http/AdminRouteSpec.scala | 4 +- .../apache/s2graph/http/MutateRouteSpec.scala | 8 +- 5 files changed, 91 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f41ab569/s2http/src/main/scala/org/apache/s2graph/http/PlayJsonSupport.scala ---------------------------------------------------------------------- diff --git a/s2http/src/main/scala/org/apache/s2graph/http/PlayJsonSupport.scala b/s2http/src/main/scala/org/apache/s2graph/http/PlayJsonSupport.scala index 8b4e91c..244e588 100644 --- a/s2http/src/main/scala/org/apache/s2graph/http/PlayJsonSupport.scala +++ b/s2http/src/main/scala/org/apache/s2graph/http/PlayJsonSupport.scala @@ -6,7 +6,7 @@ import akka.http.scaladsl.marshalling.{Marshaller, ToEntityMarshaller} import akka.http.scaladsl.model._ import akka.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshaller} import akka.util.ByteString -import play.api.libs.json.{JsValue, Json} +import play.api.libs.json._ trait PlayJsonSupport { @@ -31,4 +31,44 @@ trait PlayJsonSupport { case data => Json.parse(data.decodeString(Charset.forName("UTF-8"))) } } + + trait ToPlayJson[T] { + def toJson(msg: T): JsValue + } + + import scala.language.reflectiveCalls + + object ToPlayJson { + type ToPlayJsonReflective = { + def toJson: JsValue + } + + implicit def forToJson[A <: ToPlayJsonReflective] = new ToPlayJson[A] { + def toJson(js: A) = js.toJson + } + + implicit def forPlayJson[A <: JsValue] = new ToPlayJson[A] { + def toJson(js: A) = js + } + } + + implicit object JsErrorJsonWriter extends Writes[JsError] { + def writes(o: JsError): JsValue = Json.obj( + "errors" -> JsArray( + o.errors.map { + case (path, validationErrors) => Json.obj( + "path" -> Json.toJson(path.toString()), + "validationErrors" -> JsArray(validationErrors.map(validationError => Json.obj( + "message" -> JsString(validationError.message), + "args" -> JsArray(validationError.args.map { + case x: Int => JsNumber(x) + case x => JsString(x.toString) + }) + ))) + ) + } + ) + ) + } + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f41ab569/s2http/src/main/scala/org/apache/s2graph/http/S2GraphAdminRoute.scala ---------------------------------------------------------------------- diff --git a/s2http/src/main/scala/org/apache/s2graph/http/S2GraphAdminRoute.scala b/s2http/src/main/scala/org/apache/s2graph/http/S2GraphAdminRoute.scala index 9bf7eb4..47ac86a 100644 --- a/s2http/src/main/scala/org/apache/s2graph/http/S2GraphAdminRoute.scala +++ b/s2http/src/main/scala/org/apache/s2graph/http/S2GraphAdminRoute.scala @@ -12,30 +12,9 @@ import play.api.libs.json._ import scala.util._ -object S2GraphAdminRoute { - - trait AdminMessageFormatter[T] { - def toJson(msg: T): JsValue - } - - import scala.language.reflectiveCalls - - object AdminMessageFormatter { - type ToPlayJson = { - def toJson: JsValue - } - - implicit def toPlayJson[A <: ToPlayJson] = new AdminMessageFormatter[A] { - def toJson(js: A) = js.toJson - } - - implicit def fromPlayJson[T <: JsValue] = new AdminMessageFormatter[T] { - def toJson(js: T) = js - } - } - - def toHttpEntity[A: AdminMessageFormatter](opt: Option[A], status: StatusCode = StatusCodes.OK, message: String = ""): HttpResponse = { - val ev = implicitly[AdminMessageFormatter[A]] +object S2GraphAdminRoute extends PlayJsonSupport { + def toHttpEntity[A: ToPlayJson](opt: Option[A], status: StatusCode = StatusCodes.OK, message: String = ""): HttpResponse = { + val ev = implicitly[ToPlayJson[A]] val res = opt.map(ev.toJson).getOrElse(Json.obj("message" -> message)) HttpResponse( @@ -44,9 +23,9 @@ object S2GraphAdminRoute { ) } - def toHttpEntity[A: AdminMessageFormatter](opt: Try[A]): HttpResponse = { - val ev = implicitly[AdminMessageFormatter[A]] - val (status, res) = opt match { + def toHttpEntity[A: ToPlayJson](_try: Try[A]): HttpResponse = { + val ev = implicitly[ToPlayJson[A]] + val (status, res) = _try match { case Success(m) => StatusCodes.Created -> Json.obj("status" -> "ok", "message" -> ev.toJson(m)) case Failure(e) => StatusCodes.OK -> Json.obj("status" -> "failure", "message" -> e.toString) } @@ -54,9 +33,9 @@ object S2GraphAdminRoute { toHttpEntity(Option(res), status = status) } - def toHttpEntity[A: AdminMessageFormatter](ls: Seq[A], status: StatusCode = StatusCodes.OK): HttpResponse = { - val ev = implicitly[AdminMessageFormatter[A]] - val res = ls.map(ev.toJson) + def toHttpEntity[A: ToPlayJson](ls: Seq[A], status: StatusCode): HttpResponse = { + val ev = implicitly[ToPlayJson[A]] + val res = JsArray(ls.map(ev.toJson)) HttpResponse( status = status, @@ -85,11 +64,7 @@ trait S2GraphAdminRoute extends PlayJsonSupport { } // GET /graphs/getServiceColumn/:serviceName/:columnName - lazy val getServiceColumn = path("getServiceColumn" / Segments) { params => - val (serviceName, columnName) = params match { - case s :: c :: Nil => (s, c) - } - + lazy val getServiceColumn = path("getServiceColumn" / Segment / Segment) { (serviceName, columnName) => val ret = Management.findServiceColumn(serviceName, columnName) complete(toHttpEntity(ret, message = s"ServiceColumn not found: ${serviceName}, ${columnName}")) } @@ -105,7 +80,7 @@ trait S2GraphAdminRoute extends PlayJsonSupport { lazy val getLabels = path("getLabels" / Segment) { serviceName => val ret = Management.findLabels(serviceName) - complete(toHttpEntity(ret)) + complete(toHttpEntity(ret, StatusCodes.OK)) } /* POST */ @@ -175,6 +150,7 @@ trait S2GraphAdminRoute extends PlayJsonSupport { val (serviceName, columnName, storeInGlobalIndex) = params match { case s :: c :: Nil => (s, c, false) case s :: c :: i :: Nil => (s, c, i.toBoolean) + case _ => throw new RuntimeException("Invalid Params") } entity(as[JsValue]) { params => @@ -199,45 +175,33 @@ trait S2GraphAdminRoute extends PlayJsonSupport { hTableParams.preSplitSize, hTableParams.hTableTTL, hTableParams.compressionAlgorithm.getOrElse(Management.DefaultCompressionAlgorithm)) - complete(toHttpEntity(None, status = StatusCodes.OK, message = "created")) + complete(toHttpEntity(None: Option[JsValue], status = StatusCodes.OK, message = "created")) } - case err@JsError(_) => complete(toHttpEntity(None, status = StatusCodes.BadRequest, message = Json.toJson(err).toString)) + case err@JsError(_) => complete(toHttpEntity(None: Option[JsValue], status = StatusCodes.BadRequest, message = Json.toJson(err).toString)) } } } // POST /graphs/copyLabel/:oldLabelName/:newLabelName - lazy val copyLabel = path("copyLabel" / Segments) { params => - val (oldLabelName, newLabelName) = params match { - case oldLabel :: newLabel :: Nil => (oldLabel, newLabel) - } - + lazy val copyLabel = path("copyLabel" / Segment / Segment) { (oldLabelName, newLabelName) => val copyTry = management.copyLabel(oldLabelName, newLabelName, Some(newLabelName)) complete(toHttpEntity(copyTry)) } // POST /graphs/renameLabel/:oldLabelName/:newLabelName - lazy val renameLabel = path("renameLabel" / Segments) { params => - val (oldLabelName, newLabelName) = params match { - case oldLabel :: newLabel :: Nil => (oldLabel, newLabel) - } - + lazy val renameLabel = path("renameLabel" / Segment / Segment) { (oldLabelName, newLabelName) => Label.findByName(oldLabelName) match { - case None => complete(toHttpEntity(None, status = StatusCodes.NotFound, message = s"Label $oldLabelName not found.")) + case None => complete(toHttpEntity(None: Option[JsValue], status = StatusCodes.NotFound, message = s"Label $oldLabelName not found.")) case Some(label) => Management.updateLabelName(oldLabelName, newLabelName) - complete(toHttpEntity(None, message = s"${label} was updated.")) + complete(toHttpEntity(None: Option[JsValue], message = s"${label} was updated.")) } } // POST /graphs/swapLabels/:leftLabelName/:rightLabelName - lazy val swapLabel = path("swapLabel" / Segments) { params => - val (leftLabelName, rightLabelName) = params match { - case left :: right :: Nil => (left, right) - } - + lazy val swapLabel = path("swapLabel" / Segment / Segment) { (leftLabelName, rightLabelName) => val left = Label.findByName(leftLabelName, useCache = false) val right = Label.findByName(rightLabelName, useCache = false) // verify same schema @@ -245,20 +209,15 @@ trait S2GraphAdminRoute extends PlayJsonSupport { (left, right) match { case (Some(l), Some(r)) => Management.swapLabelNames(leftLabelName, rightLabelName) - - complete(toHttpEntity(None, message = s"Labels were swapped.")) + complete(toHttpEntity(None: Option[JsValue], message = s"Labels were swapped.")) case _ => - complete(toHttpEntity(None, status = StatusCodes.NotFound, message = s"Label ${leftLabelName} or ${rightLabelName} not found.")) + complete(toHttpEntity(None: Option[JsValue], status = StatusCodes.NotFound, message = s"Label ${leftLabelName} or ${rightLabelName} not found.")) } } // POST /graphs/updateHTable/:labelName/:newHTableName - lazy val updateHTable = path("updateHTable" / Segments) { params => - val (labelName, newHTableName) = params match { - case l :: h :: Nil => (l, h) - } - - val updateTry = Management.updateHTable(labelName, newHTableName) + lazy val updateHTable = path("updateHTable" / Segment / Segment) { (labelName, newHTableName) => + val updateTry = Management.updateHTable(labelName, newHTableName).map(Json.toJson(_)) complete(toHttpEntity(updateTry)) } @@ -273,17 +232,13 @@ trait S2GraphAdminRoute extends PlayJsonSupport { // PUT /graphs/markDeletedLabel/:labelName lazy val markDeletedLabel = path("markDeletedLabel" / Segment) { labelName => - val ret = Management.markDeletedLabel(labelName).toOption + val ret = Management.markDeletedLabel(labelName).toOption.map(Json.toJson(_)) complete(toHttpEntity(ret, message = s"Label not found: ${labelName}")) } // PUT /graphs/deleteServiceColumn/:serviceName/:columnName - lazy val deleteServiceColumn = path("deleteServiceColumn" / Segments) { params => - val (serviceName, columnName) = params match { - case s :: c :: Nil => (s, c) - } - + lazy val deleteServiceColumn = path("deleteServiceColumn" / Segment / Segment) { (serviceName, columnName) => val ret = Management.deleteColumn(serviceName, columnName).toOption complete(toHttpEntity(ret, message = s"ServiceColumn not found: ${serviceName}, ${columnName}")) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f41ab569/s2http/src/main/scala/org/apache/s2graph/http/S2GraphMutateRoute.scala ---------------------------------------------------------------------- diff --git a/s2http/src/main/scala/org/apache/s2graph/http/S2GraphMutateRoute.scala b/s2http/src/main/scala/org/apache/s2graph/http/S2GraphMutateRoute.scala index fc3b768..a65db5a 100644 --- a/s2http/src/main/scala/org/apache/s2graph/http/S2GraphMutateRoute.scala +++ b/s2http/src/main/scala/org/apache/s2graph/http/S2GraphMutateRoute.scala @@ -12,56 +12,40 @@ import play.api.libs.json.{JsValue, Json} import scala.concurrent.{ExecutionContext, Future} -trait S2GraphMutateRoute { +trait S2GraphMutateRoute extends PlayJsonSupport { val s2graph: S2Graph val logger = LoggerFactory.getLogger(this.getClass) + lazy val parser = new RequestParser(s2graph) // lazy val requestParser = new RequestParser(s2graph) lazy val exceptionHandler = ExceptionHandler { - case ex: JsonParseException => - complete(StatusCodes.BadRequest -> ex.getMessage) - case ex: java.lang.IllegalArgumentException => - complete(StatusCodes.BadRequest -> ex.getMessage) + case ex: JsonParseException => complete(StatusCodes.BadRequest -> ex.getMessage) + case ex: java.lang.IllegalArgumentException => complete(StatusCodes.BadRequest -> ex.getMessage) } lazy val mutateVertex = path("vertex" / Segments) { params => + implicit val ec = s2graph.ec + val (operation, serviceNameOpt, columnNameOpt) = params match { - case operation :: serviceName :: columnName :: Nil => - (operation, Option(serviceName), Option(columnName)) - case operation :: Nil => - (operation, None, None) + case operation :: serviceName :: columnName :: Nil => (operation, Option(serviceName), Option(columnName)) + case operation :: Nil => (operation, None, None) + case _ => throw new RuntimeException("invalid params") } - entity(as[String]) { body => - val payload = Json.parse(body) - - implicit val ec = s2graph.ec - - val future = vertexMutate(payload, operation, serviceNameOpt, columnNameOpt).map { mutateResponses => - HttpResponse( - status = StatusCodes.OK, - entity = HttpEntity(ContentTypes.`application/json`, Json.toJson(mutateResponses).toString) - ) - } + entity(as[JsValue]) { payload => + val future = vertexMutate(payload, operation, serviceNameOpt, columnNameOpt).map(Json.toJson(_)) complete(future) } } lazy val mutateEdge = path("edge" / Segment) { operation => - entity(as[String]) { body => - val payload = Json.parse(body) - - implicit val ec = s2graph.ec + implicit val ec = s2graph.ec - val future = edgeMutate(payload, operation, withWait = true).map { mutateResponses => - HttpResponse( - status = StatusCodes.OK, - entity = HttpEntity(ContentTypes.`application/json`, Json.toJson(mutateResponses).toString) - ) - } + entity(as[JsValue]) { payload => + val future = edgeMutate(payload, operation, withWait = true).map(Json.toJson(_)) complete(future) } @@ -79,13 +63,10 @@ trait S2GraphMutateRoute { s2graph.mutateVertices(verticesToStore, withWait).map(_.map(_.isSuccess)) } - def edgeMutate(elementsWithTsv: Seq[(GraphElement, String)], - withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = { + def edgeMutate(elementsWithTsv: Seq[(GraphElement, String)], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = { val elementWithIdxs = elementsWithTsv.zipWithIndex + val (elementSync, elementAsync) = elementWithIdxs.partition { case ((element, tsv), idx) => !element.isAsync } - val (elementSync, elementAsync) = elementWithIdxs.partition { case ((element, tsv), idx) => - !element.isAsync - } val retToSkip = elementAsync.map(_._2 -> MutateResponse.Success) val (elementsToStore, _) = elementSync.map(_._1).unzip val elementsIdxToStore = elementSync.map(_._2) @@ -95,9 +76,7 @@ trait S2GraphMutateRoute { }.map(_.sortBy(_._1).map(_._2.isSuccess)) } - def edgeMutate(jsValue: JsValue, - operation: String, - withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = { + def edgeMutate(jsValue: JsValue, operation: String, withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = { val edgesWithTsv = parser.parseJsonFormat(jsValue, operation) edgeMutate(edgesWithTsv, withWait) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f41ab569/s2http/src/test/scala/org/apache/s2graph/http/AdminRouteSpec.scala ---------------------------------------------------------------------- diff --git a/s2http/src/test/scala/org/apache/s2graph/http/AdminRouteSpec.scala b/s2http/src/test/scala/org/apache/s2graph/http/AdminRouteSpec.scala index eade7e6..26a7045 100644 --- a/s2http/src/test/scala/org/apache/s2graph/http/AdminRouteSpec.scala +++ b/s2http/src/test/scala/org/apache/s2graph/http/AdminRouteSpec.scala @@ -32,7 +32,7 @@ class AdminRoutesSpec extends WordSpec with Matchers with ScalaFutures with Scal "compressionAlgorithm" -> "gz" ) - val serviceEntity = Marshal(serviceParam.toString).to[MessageEntity].futureValue + val serviceEntity = Marshal(serviceParam).to[MessageEntity].futureValue val request = Post("/createService").withEntity(serviceEntity) request ~> routes ~> check { @@ -71,7 +71,7 @@ class AdminRoutesSpec extends WordSpec with Matchers with ScalaFutures with Scal ) ) - val serviceColumnEntity = Marshal(serviceColumnParam.toString).to[MessageEntity].futureValue + val serviceColumnEntity = Marshal(serviceColumnParam).to[MessageEntity].futureValue val request = Post("/createServiceColumn").withEntity(serviceColumnEntity) request ~> routes ~> check { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f41ab569/s2http/src/test/scala/org/apache/s2graph/http/MutateRouteSpec.scala ---------------------------------------------------------------------- diff --git a/s2http/src/test/scala/org/apache/s2graph/http/MutateRouteSpec.scala b/s2http/src/test/scala/org/apache/s2graph/http/MutateRouteSpec.scala index f823cd5..943db98 100644 --- a/s2http/src/test/scala/org/apache/s2graph/http/MutateRouteSpec.scala +++ b/s2http/src/test/scala/org/apache/s2graph/http/MutateRouteSpec.scala @@ -25,8 +25,11 @@ class MutateRouteSpec extends WordSpec with Matchers with PlayJsonSupport with S val columnName = "userName" "MutateRoute" should { + "be able to insert vertex (POST /mutate/vertex/insert)" in { - // {"timestamp": 10, "serviceName": "s2graph", "columnName": "user", "id": 1, "props": {}} + s2graph.management.createService(serviceName, "localhost", s"${serviceName}-dev", 1, None) + + // {"timestamp": 10, "serviceName": "s2graph", "columnName": "user", "id": 1, "props": {}} val param = Json.obj( "timestamp" -> 10, "serviceName" -> serviceName, @@ -36,7 +39,8 @@ class MutateRouteSpec extends WordSpec with Matchers with PlayJsonSupport with S "age" -> 20 ) ) - val entity = Marshal(param.toString).to[MessageEntity].futureValue + + val entity = Marshal(param).to[MessageEntity].futureValue val request = Post("/vertex/insert").withEntity(entity) request ~> routes ~> check {