Repository: incubator-s2graph Updated Branches: refs/heads/master 3332f6bc1 -> 1799ae456
implement S2GraphSink.write method for writeBatch. Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/b5535ebc Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/b5535ebc Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/b5535ebc Branch: refs/heads/master Commit: b5535ebccf72cd71ba45c177dc1d4ea2adaf94b8 Parents: 3332f6b Author: DO YUNG YOON <steams...@apache.org> Authored: Mon Apr 2 17:10:45 2018 +0900 Committer: DO YUNG YOON <steams...@apache.org> Committed: Mon Apr 2 17:10:45 2018 +0900 ---------------------------------------------------------------------- .../apache/s2graph/s2jobs/S2GraphHelper.scala | 57 ++++- .../SparkGraphElementLoaderTransformer.scala | 75 +++++++ .../serde/reader/RowBulkFormatReader.scala | 14 ++ .../org/apache/s2graph/s2jobs/task/Sink.scala | 25 ++- .../sql/streaming/S2StreamQueryWriter.scala | 88 ++++---- .../apache/s2graph/s2jobs/BaseSparkTest.scala | 13 +- .../s2jobs/loader/GraphFileGeneratorTest.scala | 216 ++++++++++--------- 7 files changed, 338 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b5535ebc/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala index 3f80e8f..9eb9cc8 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala @@ -24,9 +24,15 @@ import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls.{Label, LabelMeta} import org.apache.s2graph.core.storage.SKeyValue import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId} -import play.api.libs.json.Json +import org.apache.s2graph.s2jobs.loader.GraphFileOptions +import org.apache.s2graph.s2jobs.task.TaskConf +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema +import org.apache.spark.sql.types.StructType +import play.api.libs.json.{JsObject, Json} import scala.concurrent.ExecutionContext +import scala.util.Try object S2GraphHelper { def initS2Graph(config: Config)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global): S2Graph = { @@ -78,4 +84,53 @@ object S2GraphHelper { Nil } } + + //TODO: + def toGraphFileOptions(taskConf: TaskConf): GraphFileOptions = { + GraphFileOptions() + } + + def sparkSqlRowToGraphElement(s2: S2Graph, row: Row, schema: StructType, reservedColumn: Set[String]): Option[GraphElement] = { + val timestamp = row.getAs[Long]("timestamp") + val operation = Try(row.getAs[String]("operation")).getOrElse("insert") + val elem = Try(row.getAs[String]("elem")).getOrElse("e") + + val props: Map[String, Any] = Option(row.getAs[String]("props")) match { + case Some(propsStr:String) => + JSONParser.fromJsonToProperties(Json.parse(propsStr).as[JsObject]) + case None => + schema.fieldNames.flatMap { field => + if (!reservedColumn.contains(field)) { + Seq( + field -> getRowValAny(row, field) + ) + } else Nil + }.toMap + } + + elem match { + case "e" | "edge" => + val from = getRowValAny(row, "from") + val to = getRowValAny(row, "to") + val label = row.getAs[String]("label") + val direction = Try(row.getAs[String]("direction")).getOrElse("out") + Some( + s2.elementBuilder.toEdge(from, to, label, direction, props, timestamp, operation) + ) + case "v" | "vertex" => + val id = getRowValAny(row, "id") + val serviceName = row.getAs[String]("service") + val columnName = row.getAs[String]("column") + Some( + s2.elementBuilder.toVertex(serviceName, columnName, id, props, timestamp, operation) + ) + case _ => + None + } + } + + private def getRowValAny(row:Row, fieldName:String):Any = { + val idx = row.fieldIndex(fieldName) + row.get(idx) + } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b5535ebc/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkGraphElementLoaderTransformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkGraphElementLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkGraphElementLoaderTransformer.scala new file mode 100644 index 0000000..fcf8d4c --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkGraphElementLoaderTransformer.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs.loader + +import com.typesafe.config.Config +import org.apache.hadoop.hbase.{KeyValue => HKeyValue} +import org.apache.s2graph.core.GraphElement +import org.apache.s2graph.s2jobs.serde.Transformer +import org.apache.s2graph.s2jobs.serde.reader.{RowBulkFormatReader, TsvBulkFormatReader} +import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter +import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Row + +class SparkGraphElementLoaderTransformer(val config: Config, + val options: GraphFileOptions) extends Transformer[Row, Seq[HKeyValue], org.apache.spark.rdd.RDD] { + val reader = new RowBulkFormatReader + + val writer = new KeyValueWriter + + override def read(input: RDD[Row]): RDD[GraphElement] = input.mapPartitions { iter => + val s2 = S2GraphHelper.initS2Graph(config) + + iter.flatMap(reader.read(s2)(_)) + } + + override def write(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = elements.mapPartitions { iter => + val s2 = S2GraphHelper.initS2Graph(config) + + iter.map(writer.write(s2)(_)) + } + + override def buildDegrees(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = { + val degrees = elements.mapPartitions { iter => + val s2 = S2GraphHelper.initS2Graph(config) + + iter.flatMap { element => + DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L) + } + }.reduceByKey(_ + _) + + degrees.mapPartitions { iter => + val s2 = S2GraphHelper.initS2Graph(config) + + iter.map { case (degreeKey, count) => + DegreeKey.toKeyValue(s2, degreeKey, count) + } + } + } + + override def transform(input: RDD[Row]): RDD[Seq[HKeyValue]] = { + val elements = read(input) + val kvs = write(elements) + + if (options.buildDegree) kvs ++ buildDegrees(elements) + kvs + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b5535ebc/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala new file mode 100644 index 0000000..73e56ce --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala @@ -0,0 +1,14 @@ +package org.apache.s2graph.s2jobs.serde.reader + +import org.apache.s2graph.core.{GraphElement, S2Graph} +import org.apache.s2graph.s2jobs.S2GraphHelper +import org.apache.s2graph.s2jobs.serde.GraphElementReadable +import org.apache.spark.sql.Row + +class RowBulkFormatReader extends GraphElementReadable[Row] { + private val RESERVED_COLUMN = Set("timestamp", "from", "to", "label", "operation", "elem", "direction") + + override def read(s2: S2Graph)(row: Row): Option[GraphElement] = + S2GraphHelper.sparkSqlRowToGraphElement(s2, row, row.schema, RESERVED_COLUMN) + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b5535ebc/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala index 3d5beb6..b7a91d9 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala @@ -19,6 +19,12 @@ package org.apache.s2graph.s2jobs.task +import com.typesafe.config.Config +import org.apache.s2graph.core.{GraphElement, Management} +import org.apache.s2graph.s2jobs.S2GraphHelper +import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, SparkBulkLoaderTransformer, SparkGraphElementLoaderTransformer} +import org.apache.s2graph.s2jobs.serde.GraphElementReadable +import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger} import org.elasticsearch.spark.sql.EsSparkSQL @@ -189,7 +195,22 @@ class S2graphSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) override def mandatoryOptions: Set[String] = Set() override val FORMAT: String = "org.apache.s2graph.spark.sql.streaming.S2SinkProvider" - override protected def writeBatch(writer: DataFrameWriter[Row]): Unit = - throw new RuntimeException(s"unsupported source type for ${this.getClass.getSimpleName} : ${conf.name}") + private val RESERVED_COLUMN = Set("timestamp", "from", "to", "label", "operation", "elem", "direction") + + override def write(inputDF: DataFrame):Unit = { + val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism) + + if (inputDF.isStreaming) writeStream(df.writeStream) + else { + val config: Config = Management.toConfig(conf.options) + val bulkLoadOptions: GraphFileOptions = S2GraphHelper.toGraphFileOptions(conf) + val input = df.rdd + + val transformer = new SparkGraphElementLoaderTransformer(config, bulkLoadOptions) + val kvs = transformer.transform(input) + + HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, kvs.flatMap(ls => ls), bulkLoadOptions) + } + } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b5535ebc/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala index e15efe8..ac37533 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala @@ -21,6 +21,7 @@ package org.apache.s2graph.spark.sql.streaming import com.typesafe.config.ConfigFactory import org.apache.s2graph.core.{GraphElement, JSONParser} +import org.apache.s2graph.s2jobs.S2GraphHelper import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._ import org.apache.spark.TaskContext import org.apache.spark.sql.Row @@ -83,48 +84,51 @@ private [sql] class S2StreamQueryWriter( } } - private def rowToEdge(internalRow:InternalRow): Option[GraphElement] = { - val s2Graph = s2SinkContext.getGraph - val row = encoder.fromRow(internalRow) - - val timestamp = row.getAs[Long]("timestamp") - val operation = Try(row.getAs[String]("operation")).getOrElse("insert") - val elem = Try(row.getAs[String]("elem")).getOrElse("e") - - val props: Map[String, Any] = Option(row.getAs[String]("props")) match { - case Some(propsStr:String) => - JSONParser.fromJsonToProperties(Json.parse(propsStr).as[JsObject]) - case None => - schema.fieldNames.flatMap { field => - if (!RESERVED_COLUMN.contains(field)) { - Seq( - field -> getRowValAny(row, field) - ) - } else Nil - }.toMap - } - - elem match { - case "e" | "edge" => - val from = getRowValAny(row, "from") - val to = getRowValAny(row, "to") - val label = row.getAs[String]("label") - val direction = Try(row.getAs[String]("direction")).getOrElse("out") - Some( - s2Graph.elementBuilder.toEdge(from, to, label, direction, props, timestamp, operation) - ) - case "v" | "vertex" => - val id = getRowValAny(row, "id") - val serviceName = row.getAs[String]("service") - val columnName = row.getAs[String]("column") - Some( - s2Graph.elementBuilder.toVertex(serviceName, columnName, id, props, timestamp, operation) - ) - case _ => - logger.warn(s"'$elem' is not GraphElement. skipped!! (${row.toString()})") - None - } - } + private def rowToEdge(internalRow:InternalRow): Option[GraphElement] = + S2GraphHelper.sparkSqlRowToGraphElement(s2SinkContext.getGraph, encoder.fromRow(internalRow), schema, RESERVED_COLUMN) + +// { +// val s2Graph = s2SinkContext.getGraph +// val row = encoder.fromRow(internalRow) +// +// val timestamp = row.getAs[Long]("timestamp") +// val operation = Try(row.getAs[String]("operation")).getOrElse("insert") +// val elem = Try(row.getAs[String]("elem")).getOrElse("e") +// +// val props: Map[String, Any] = Option(row.getAs[String]("props")) match { +// case Some(propsStr:String) => +// JSONParser.fromJsonToProperties(Json.parse(propsStr).as[JsObject]) +// case None => +// schema.fieldNames.flatMap { field => +// if (!RESERVED_COLUMN.contains(field)) { +// Seq( +// field -> getRowValAny(row, field) +// ) +// } else Nil +// }.toMap +// } +// +// elem match { +// case "e" | "edge" => +// val from = getRowValAny(row, "from") +// val to = getRowValAny(row, "to") +// val label = row.getAs[String]("label") +// val direction = Try(row.getAs[String]("direction")).getOrElse("out") +// Some( +// s2Graph.elementBuilder.toEdge(from, to, label, direction, props, timestamp, operation) +// ) +// case "v" | "vertex" => +// val id = getRowValAny(row, "id") +// val serviceName = row.getAs[String]("service") +// val columnName = row.getAs[String]("column") +// Some( +// s2Graph.elementBuilder.toVertex(serviceName, columnName, id, props, timestamp, operation) +// ) +// case _ => +// logger.warn(s"'$elem' is not GraphElement. skipped!! (${row.toString()})") +// None +// } +// } private def getRowValAny(row:Row, fieldName:String):Any = { val idx = row.fieldIndex(fieldName) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b5535ebc/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala index 78000d4..4f02808 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala @@ -21,6 +21,7 @@ package org.apache.s2graph.s2jobs import java.io.{File, PrintWriter} +import com.holdenkarau.spark.testing.DataFrameSuiteBase import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} import org.apache.s2graph.core.mysqls.{Label, ServiceColumn} import org.apache.s2graph.core.{Management, S2Graph} @@ -31,11 +32,10 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} import scala.util.Try -class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll { +class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase { private val master = "local[2]" private val appName = "example-spark" - protected var sc: SparkContext = _ protected val options = GraphFileOptions( input = "/tmp/test.txt", tempDir = "/tmp/bulkload_tmp", @@ -65,18 +65,15 @@ class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll { override def beforeAll(): Unit = { // initialize spark context. - val conf = new SparkConf() - .setMaster(master) - .setAppName(appName) - - sc = new SparkContext(conf) + super.beforeAll() s2 = S2GraphHelper.initS2Graph(s2Config) initTestDataFile } override def afterAll(): Unit = { - if (sc != null) sc.stop() + super.afterAll() + if (s2 != null) s2.shutdown() } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b5535ebc/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala index 3fbbd88..baf9b32 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala @@ -19,24 +19,22 @@ package org.apache.s2graph.s2jobs.loader -import org.apache.hadoop.hbase.HBaseConfiguration -import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles -import org.apache.hadoop.util.ToolRunner -import org.apache.s2graph.core.{PostProcess, S2VertexLike} +import org.apache.s2graph.core.PostProcess import org.apache.s2graph.core.storage.{CanSKeyValue, SKeyValue} import org.apache.s2graph.s2jobs.BaseSparkTest +import org.apache.spark.rdd.RDD import play.api.libs.json.Json -import scala.io.Source - class GraphFileGeneratorTest extends BaseSparkTest { - import scala.concurrent.ExecutionContext.Implicits.global + import org.apache.hadoop.hbase.{KeyValue => HKeyValue} + import scala.concurrent.ExecutionContext.Implicits.global + def transformToSKeyValues(transformerMode: String, edges: Seq[String]): List[SKeyValue] = { transformerMode match { case "spark" => - val input = sc.parallelize(edges) + val input: RDD[String] = sc.parallelize(edges) val transformer = new SparkBulkLoaderTransformer(s2Config, options) val kvs = transformer.transform(input) kvs.flatMap { kvs => @@ -54,39 +52,39 @@ class GraphFileGeneratorTest extends BaseSparkTest { CanSKeyValue.hbaseKeyValue.toSKeyValue(kv) } }.toList - } - } - test("test generateKeyValues edge only. SparkBulkLoaderTransformer") { - val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm) - /* end of initialize model */ - - val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}" - - val transformerMode = "spark" - val ls = transformToSKeyValues(transformerMode, Seq(bulkEdgeString)) - - val serDe = s2.defaultStorage.serDe - - val bulkEdge = s2.elementBuilder.toGraphElement(bulkEdgeString, options.labelMapping).get - val indexEdges = ls.flatMap { kv => - serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(kv), None) + case "dataset" => + import spark.sqlContext.implicits._ + val elements = edges.flatMap(s2.elementBuilder.toEdge(_)) + + val rows = elements.map { e => + (e.getTs(), + e.getOperation(), + "e", + e.srcVertex.innerIdVal.toString, + e.tgtVertex.innerIdVal.toString, + e.label(), + "{}", + e.getDirection()) + }.toDF("timestamp", "operation", "element", "from", "to", "label", "props", "direction").rdd + + val transformer = new SparkGraphElementLoaderTransformer(s2Config, options) + val kvs = transformer.transform(rows) + kvs.flatMap { kvs => + kvs.map { kv => + CanSKeyValue.hbaseKeyValue.toSKeyValue(kv) + } + }.collect().toList } - - val indexEdge = indexEdges.head - - println(indexEdge) - println(bulkEdge) - - bulkEdge shouldBe (indexEdge) } - test("test generateKeyValues edge only. LocalBulkLoaderTransformer") { + + test("test generateKeyValues edge only. SparkBulkLoaderTransformer") { val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm) /* end of initialize model */ val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}" - val transformerMode = "local" + val transformerMode = "dataset" val ls = transformToSKeyValues(transformerMode, Seq(bulkEdgeString)) val serDe = s2.defaultStorage.serDe @@ -104,82 +102,106 @@ class GraphFileGeneratorTest extends BaseSparkTest { bulkEdge shouldBe (indexEdge) } - - test("test generateKeyValues vertex only. SparkBulkLoaderTransformer") { - val serviceColumn = initTestVertexSchema(s2) - val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广ä¸ç\",\"location_often_city\":\"æ·±å³å¸\",\"location_often_days\":6,\"location_last_province\":\"广ä¸ç\",\"location_last_city\":\"æ·±å³å¸\",\"fimei_legality\":3}" - val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, options.labelMapping).get - - val transformerMode = "spark" - val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString)) - - val serDe = s2.defaultStorage.serDe - - val vertex = serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, None).get - - PostProcess.s2VertexToJson(vertex).foreach { jsValue => - println(Json.prettyPrint(jsValue)) - } - - bulkVertex shouldBe(vertex) - } - - test("test generateKeyValues vertex only. LocalBulkLoaderTransformer") { - val serviceColumn = initTestVertexSchema(s2) - val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广ä¸ç\",\"location_often_city\":\"æ·±å³å¸\",\"location_often_days\":6,\"location_last_province\":\"广ä¸ç\",\"location_last_city\":\"æ·±å³å¸\",\"fimei_legality\":3}" - val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, options.labelMapping).get - - val transformerMode = "local" - val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString)) - - val serDe = s2.defaultStorage.serDe - - val vertex = serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, None).get - - PostProcess.s2VertexToJson(vertex).foreach { jsValue => - println(Json.prettyPrint(jsValue)) - } - - bulkVertex shouldBe(vertex) - } - -// this test case expect options.input already exist with valid bulk load format. -// test("bulk load and fetch vertex: spark mode") { -// import scala.collection.JavaConverters._ -// val serviceColumn = initTestVertexSchema(s2) +// test("test generateKeyValues edge only. LocalBulkLoaderTransformer") { +// val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm) +// /* end of initialize model */ +// +// val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}" // -// val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq -// val input = sc.parallelize(bulkVertexLs) +// val transformerMode = "local" +// val ls = transformToSKeyValues(transformerMode, Seq(bulkEdgeString)) // -// HFileGenerator.generate(sc, s2Config, input, options) +// val serDe = s2.defaultStorage.serDe // -// val hfileArgs = Array(options.output, options.tableName) -// val hbaseConfig = HBaseConfiguration.create() +// val bulkEdge = s2.elementBuilder.toGraphElement(bulkEdgeString, options.labelMapping).get // -// val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs) +// val indexEdges = ls.flatMap { kv => +// serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(kv), None) +// } // -// val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike]) -// val json = PostProcess.verticesToJson(s2Vertices) +// val indexEdge = indexEdges.head // -// println(Json.prettyPrint(json)) +// println(indexEdge) +// println(bulkEdge) +// +// bulkEdge shouldBe (indexEdge) // } - -// this test case expect options.input already exist with valid bulk load format. -// test("bulk load and fetch vertex: mr mode") { +// +// test("test generateKeyValues vertex only. SparkBulkLoaderTransformer") { +// val serviceColumn = initTestVertexSchema(s2) +// val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广ä¸ç\",\"location_often_city\":\"æ·±å³å¸\",\"location_often_days\":6,\"location_last_province\":\"广ä¸ç\",\"location_last_city\":\"æ·±å³å¸\",\"fimei_legality\":3}" +// val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, options.labelMapping).get +// +// val transformerMode = "spark" +// val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString)) +// +// val serDe = s2.defaultStorage.serDe +// +// val vertex = serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, None).get +// +// PostProcess.s2VertexToJson(vertex).foreach { jsValue => +// println(Json.prettyPrint(jsValue)) +// } +// +// bulkVertex shouldBe (vertex) +// } +// +// test("test generateKeyValues vertex only. LocalBulkLoaderTransformer") { // val serviceColumn = initTestVertexSchema(s2) +// val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广ä¸ç\",\"location_often_city\":\"æ·±å³å¸\",\"location_often_days\":6,\"location_last_province\":\"广ä¸ç\",\"location_last_city\":\"æ·±å³å¸\",\"fimei_legality\":3}" +// val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, options.labelMapping).get // -// val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq -// val input = sc.parallelize(bulkVertexLs) +// val transformerMode = "local" +// val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString)) // -// HFileMRGenerator.generate(sc, s2Config, input, options) +// val serDe = s2.defaultStorage.serDe // -// val hfileArgs = Array(options.output, options.tableName) -// val hbaseConfig = HBaseConfiguration.create() +// val vertex = serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, None).get // -// val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs) -// val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike]) -// val json = PostProcess.verticesToJson(s2Vertices) +// PostProcess.s2VertexToJson(vertex).foreach { jsValue => +// println(Json.prettyPrint(jsValue)) +// } // -// println(Json.prettyPrint(json)) +// bulkVertex shouldBe (vertex) // } + + // this test case expect options.input already exist with valid bulk load format. + // test("bulk load and fetch vertex: spark mode") { + // import scala.collection.JavaConverters._ + // val serviceColumn = initTestVertexSchema(s2) + // + // val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq + // val input = sc.parallelize(bulkVertexLs) + // + // HFileGenerator.generate(sc, s2Config, input, options) + // + // val hfileArgs = Array(options.output, options.tableName) + // val hbaseConfig = HBaseConfiguration.create() + // + // val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs) + // + // val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike]) + // val json = PostProcess.verticesToJson(s2Vertices) + // + // println(Json.prettyPrint(json)) + // } + + // this test case expect options.input already exist with valid bulk load format. + // test("bulk load and fetch vertex: mr mode") { + // val serviceColumn = initTestVertexSchema(s2) + // + // val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq + // val input = sc.parallelize(bulkVertexLs) + // + // HFileMRGenerator.generate(sc, s2Config, input, options) + // + // val hfileArgs = Array(options.output, options.tableName) + // val hbaseConfig = HBaseConfiguration.create() + // + // val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs) + // val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike]) + // val json = PostProcess.verticesToJson(s2Vertices) + // + // println(Json.prettyPrint(json)) + // } }