Merge upstream master.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/1b8a13c3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/1b8a13c3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/1b8a13c3 Branch: refs/heads/master Commit: 1b8a13c315eb054efa40f25d6dddde85ce2672af Parents: 63dd6fa Author: DO YUNG YOON <steams...@apache.org> Authored: Thu Apr 5 13:22:13 2018 +0900 Committer: DO YUNG YOON <steams...@apache.org> Committed: Thu Apr 5 13:22:13 2018 +0900 ---------------------------------------------------------------------- .../apache/s2graph/s2jobs/S2GraphHelper.scala | 77 ++++++++++--- .../s2jobs/loader/GraphFileOptions.scala | 31 +++++ .../s2graph/s2jobs/loader/HFileGenerator.scala | 23 +++- .../s2jobs/loader/HFileMRGenerator.scala | 10 +- .../loader/LocalBulkLoaderTransformer.scala | 51 +++++++++ .../loader/SparkBulkLoaderTransformer.scala | 69 +++++++++++ .../s2jobs/serde/GraphElementWritable.scala | 4 + .../serde/LocalBulkLoaderTransformer.scala | 61 ---------- .../serde/SparkBulkLoaderTransformer.scala | 76 ------------- .../s2graph/s2jobs/serde/Transformer.scala | 21 +--- .../serde/reader/RowBulkFormatReader.scala | 33 ++++++ .../s2jobs/serde/writer/KeyValueWriter.scala | 24 +++- .../org/apache/s2graph/s2jobs/task/Sink.scala | 111 +++++++++++++++--- .../org/apache/s2graph/s2jobs/task/Task.scala | 9 ++ .../sql/streaming/S2StreamQueryWriter.scala | 50 +------- .../apache/s2graph/s2jobs/BaseSparkTest.scala | 15 +-- .../s2graph/s2jobs/S2GraphHelperTest.scala | 13 ++- .../s2jobs/loader/GraphFileGeneratorTest.scala | 113 ++++++++++++------- .../apache/s2graph/s2jobs/task/SinkTest.scala | 98 ++++++++++++++++ 19 files changed, 591 insertions(+), 298 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala index 383f39f..6e68d28 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala @@ -24,10 +24,12 @@ import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls.{Label, LabelMeta} import org.apache.s2graph.core.storage.SKeyValue import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId} -import org.apache.s2graph.s2jobs.loader.GraphFileOptions -import play.api.libs.json.Json +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.StructType +import play.api.libs.json.{JsObject, Json} import scala.concurrent.ExecutionContext +import scala.util.Try object S2GraphHelper { def initS2Graph(config: Config)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global): S2Graph = { @@ -55,8 +57,8 @@ object S2GraphHelper { } } - private def insertBulkForLoaderAsync(s2: S2Graph, edge: S2Edge, option: GraphFileOptions): Seq[SKeyValue] = { - val relEdges = if (option.autoEdgeCreate) edge.relatedEdges else List(edge) + private def insertBulkForLoaderAsync(s2: S2Graph, edge: S2Edge, createRelEdges: Boolean = true): Seq[SKeyValue] = { + val relEdges = if (createRelEdges) edge.relatedEdges else List(edge) val snapshotEdgeKeyValues = s2.getStorage(edge.toSnapshotEdge.label).serDe.snapshotEdgeSerializer(edge.toSnapshotEdge).toKeyValues val indexEdgeKeyValues = relEdges.flatMap { edge => @@ -68,20 +70,59 @@ object S2GraphHelper { snapshotEdgeKeyValues ++ indexEdgeKeyValues } - def toSKeyValues(s2: S2Graph, element: GraphElement, option: GraphFileOptions): Seq[SKeyValue] = { - try { - if (element.isInstanceOf[S2Edge]) { - val edge = element.asInstanceOf[S2Edge] - insertBulkForLoaderAsync(s2, edge, option) - } else if (element.isInstanceOf[S2Vertex]) { - val vertex = element.asInstanceOf[S2Vertex] - s2.getStorage(vertex.service).serDe.vertexSerializer(vertex).toKeyValues - } else { - Nil - } - } catch { - case e: Exception => - if (option.skipError) Nil else throw e + def toSKeyValues(s2: S2Graph, element: GraphElement, autoEdgeCreate: Boolean = false): Seq[SKeyValue] = { + if (element.isInstanceOf[S2Edge]) { + val edge = element.asInstanceOf[S2Edge] + insertBulkForLoaderAsync(s2, edge, autoEdgeCreate) + } else if (element.isInstanceOf[S2Vertex]) { + val vertex = element.asInstanceOf[S2Vertex] + s2.getStorage(vertex.service).serDe.vertexSerializer(vertex).toKeyValues + } else { + Nil + } + } + + def sparkSqlRowToGraphElement(s2: S2Graph, row: Row, schema: StructType, reservedColumn: Set[String]): Option[GraphElement] = { + val timestamp = row.getAs[Long]("timestamp") + val operation = Try(row.getAs[String]("operation")).getOrElse("insert") + val elem = Try(row.getAs[String]("elem")).getOrElse("e") + + val props: Map[String, Any] = Option(row.getAs[String]("props")) match { + case Some(propsStr:String) => + JSONParser.fromJsonToProperties(Json.parse(propsStr).as[JsObject]) + case None => + schema.fieldNames.flatMap { field => + if (!reservedColumn.contains(field)) { + Seq( + field -> getRowValAny(row, field) + ) + } else Nil + }.toMap } + + elem match { + case "e" | "edge" => + val from = getRowValAny(row, "from") + val to = getRowValAny(row, "to") + val label = row.getAs[String]("label") + val direction = Try(row.getAs[String]("direction")).getOrElse("out") + Some( + s2.elementBuilder.toEdge(from, to, label, direction, props, timestamp, operation) + ) + case "v" | "vertex" => + val id = getRowValAny(row, "id") + val serviceName = row.getAs[String]("service") + val columnName = row.getAs[String]("column") + Some( + s2.elementBuilder.toVertex(serviceName, columnName, id, props, timestamp, operation) + ) + case _ => + None + } + } + + private def getRowValAny(row:Row, fieldName:String):Any = { + val idx = row.fieldIndex(fieldName) + row.get(idx) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala index e855a32..f62189a 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala @@ -20,6 +20,12 @@ package org.apache.s2graph.s2jobs.loader object GraphFileOptions { + val OptionKeys = Set( + "--input", "--tempDir", "--output", "--zkQuorum", "--table", "--dbUrl", "--dbUser", "--dbPassword", "--dbDriver", + "--maxHFilePerRegionServer", "--numRegions", "--labelMapping", "--autoEdgeCreate", "--buildDegree", + "--skipError", "--incrementalLoad", "--method" + ) + val parser = new scopt.OptionParser[GraphFileOptions]("run") { opt[String]('i', "input").required().action( (x, c) => @@ -92,6 +98,10 @@ object GraphFileOptions { (inner.head, inner.last) }).toMap } + + def toLabelMappingString(labelMapping: Map[String, String]): String = + labelMapping.map { case (k, v) => Seq(k, v).mkString(":") }.mkString(",") + } /** * Option case class for TransferToHFile. @@ -137,4 +147,25 @@ case class GraphFileOptions(input: String = "", "db.default.driver" -> dbDriver ) } + + def toCommand: Array[String] = + Array( + "--input", input, + "--tempDir", tempDir, + "--output", output, + "--zkQuorum", zkQuorum, + "--table", tableName, + "--dbUrl", dbUrl, + "--dbUser", dbUser, + "--dbPassword", dbPassword, + "--dbDriver", dbDriver, + "--maxHFilePerRegionServer", maxHFilePerRegionServer.toString, + "--numRegions", numRegions.toString, + "--labelMapping", GraphFileOptions.toLabelMappingString(labelMapping), + "--autoEdgeCreate", autoEdgeCreate.toString, + "--buildDegree", buildDegree.toString, + "--skipError", skipError.toString, + "--incrementalLoad", incrementalLoad.toString, + "--method", method + ) } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala index 8ace94a..2b230c9 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala @@ -24,12 +24,14 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.client.ConnectionFactory import org.apache.hadoop.hbase.io.compress.Compression.Algorithm import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding -import org.apache.hadoop.hbase.mapreduce.TableOutputFormat +import org.apache.hadoop.hbase.mapreduce.{LoadIncrementalHFiles, TableOutputFormat} import org.apache.hadoop.hbase.regionserver.BloomType import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration, KeyValue, TableName} +import org.apache.hadoop.util.ToolRunner import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement -import org.apache.s2graph.s2jobs.serde.SparkBulkLoaderTransformer +import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader +import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter import org.apache.s2graph.s2jobs.spark.{FamilyHFileWriteOptions, HBaseContext, KeyFamilyQualifier} import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD @@ -112,11 +114,22 @@ object HFileGenerator extends RawFileGenerator[String, KeyValue] { override def generate(sc: SparkContext, config: Config, rdd: RDD[String], - _options: GraphFileOptions): Unit = { - val transformer = new SparkBulkLoaderTransformer(config, _options) + options: GraphFileOptions): Unit = { + val transformer = new SparkBulkLoaderTransformer(config, options) + + implicit val reader = new TsvBulkFormatReader + implicit val writer = new KeyValueWriter(options.autoEdgeCreate, options.skipError) + val kvs = transformer.transform(rdd).flatMap(kvs => kvs) - HFileGenerator.generateHFile(sc, config, kvs, _options) + HFileGenerator.generateHFile(sc, config, kvs, options) + } + + def loadIncrementalHFiles(options: GraphFileOptions): Int = { + /* LoadIncrementHFiles */ + val hfileArgs = Array(options.output, options.tableName) + val hbaseConfig = HBaseConfiguration.create() + ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala index fd78718..9a2e81a 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala @@ -19,8 +19,6 @@ package org.apache.s2graph.s2jobs.loader -import java.util.UUID - import com.typesafe.config.Config import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -30,11 +28,11 @@ import org.apache.hadoop.hbase.io.compress.Compression import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding import org.apache.hadoop.hbase.mapreduce.{GraphHFileOutputFormat, HFileOutputFormat2} import org.apache.hadoop.hbase.regionserver.BloomType -import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, SequenceFileInputFormat} import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, SequenceFileOutputFormat} import org.apache.hadoop.mapreduce.{Job, Mapper} -import org.apache.s2graph.s2jobs.serde.SparkBulkLoaderTransformer +import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader +import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD @@ -106,6 +104,10 @@ object HFileMRGenerator extends RawFileGenerator[String, KeyValue] { input: RDD[String], options: GraphFileOptions): RDD[KeyValue] = { val transformer = new SparkBulkLoaderTransformer(s2Config, options) + + implicit val reader = new TsvBulkFormatReader + implicit val writer = new KeyValueWriter(options.autoEdgeCreate, options.skipError) + transformer.transform(input).flatMap(kvs => kvs) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala new file mode 100644 index 0000000..d3ed6bc --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs.loader + +import com.typesafe.config.Config +import org.apache.s2graph.core.{GraphElement, S2Graph} +import org.apache.s2graph.s2jobs.serde.{GraphElementReadable, GraphElementWritable, Transformer} +import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper} + +import scala.concurrent.ExecutionContext +import scala.reflect.ClassTag + +class LocalBulkLoaderTransformer(val config: Config, + val options: GraphFileOptions)(implicit ec: ExecutionContext) extends Transformer[Seq] { + val s2: S2Graph = S2GraphHelper.initS2Graph(config) + + override def buildDegrees[T: ClassTag](elements: Seq[GraphElement])(implicit writer: GraphElementWritable[T]): Seq[T] = { + val degrees = elements.flatMap { element => + DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L) + }.groupBy(_._1).mapValues(_.map(_._2).sum) + + degrees.toSeq.map { case (degreeKey, count) => + writer.writeDegree(s2)(degreeKey, count) + } + } + + override def transform[S: ClassTag, T: ClassTag](input: Seq[S])(implicit reader: GraphElementReadable[S], writer: GraphElementWritable[T]): Seq[T] = { + val elements = input.flatMap(reader.read(s2)(_)) + val kvs = elements.map(writer.write(s2)(_)) + val degrees = if (options.buildDegree) buildDegrees[T](elements) else Nil + + kvs ++ degrees + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala new file mode 100644 index 0000000..eec69b9 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs.loader + +import com.typesafe.config.Config +import org.apache.s2graph.core.GraphElement +import org.apache.s2graph.s2jobs.serde.{GraphElementReadable, GraphElementWritable, Transformer} +import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper} +import org.apache.spark.rdd.RDD + +import scala.reflect.ClassTag + +class SparkBulkLoaderTransformer(val config: Config, + val options: GraphFileOptions) extends Transformer[RDD] { + + override def buildDegrees[T: ClassTag](elements: RDD[GraphElement])(implicit writer: GraphElementWritable[T]): RDD[T] = { + val degrees = elements.mapPartitions { iter => + val s2 = S2GraphHelper.initS2Graph(config) + + iter.flatMap { element => + DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L) + } + }.reduceByKey(_ + _) + + degrees.mapPartitions { iter => + val s2 = S2GraphHelper.initS2Graph(config) + + iter.map { case (degreeKey, count) => + writer.writeDegree(s2)(degreeKey, count) + } + } + } + + override def transform[S: ClassTag, T: ClassTag](input: RDD[S])(implicit reader: GraphElementReadable[S], writer: GraphElementWritable[T]): RDD[T] = { + val elements = input.mapPartitions { iter => + val s2 = S2GraphHelper.initS2Graph(config) + + iter.flatMap { line => + reader.read(s2)(line) + } + } + + val kvs = elements.mapPartitions { iter => + val s2 = S2GraphHelper.initS2Graph(config) + + iter.map(writer.write(s2)(_)) + } + + if (options.buildDegree) kvs ++ buildDegrees(elements) + else kvs + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala index ae082d8..f71a9e8 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala @@ -20,7 +20,11 @@ package org.apache.s2graph.s2jobs.serde import org.apache.s2graph.core.{GraphElement, S2Graph} +import org.apache.s2graph.s2jobs.DegreeKey trait GraphElementWritable[T] extends Serializable { + def write(s2: S2Graph)(element: GraphElement): T + + def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): T } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala deleted file mode 100644 index a185754..0000000 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.s2jobs.serde - -import com.typesafe.config.Config -import org.apache.hadoop.hbase.KeyValue -import org.apache.s2graph.core.{GraphElement, S2Graph} -import org.apache.s2graph.s2jobs.loader.GraphFileOptions -import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader -import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter -import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper} - -import scala.concurrent.ExecutionContext - -class LocalBulkLoaderTransformer(val config: Config, - val options: GraphFileOptions)(implicit ec: ExecutionContext) extends Transformer[String, Seq[KeyValue], Seq] { - val s2: S2Graph = S2GraphHelper.initS2Graph(config) - - override val reader = new TsvBulkFormatReader - override val writer = new KeyValueWriter(options) - - override def read(input: Seq[String]): Seq[GraphElement] = input.flatMap(reader.read(s2)(_)) - - override def write(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = elements.map(writer.write(s2)(_)) - - override def buildDegrees(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = { - val degrees = elements.flatMap { element => - DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L) - }.groupBy(_._1).mapValues(_.map(_._2).sum) - - degrees.toSeq.map { case (degreeKey, count) => - DegreeKey.toKeyValue(s2, degreeKey, count) - } - } - - override def transform(input: Seq[String]): Seq[Seq[KeyValue]] = { - val elements = read(input) - val kvs = write(elements) - - val degrees = if (options.buildDegree) buildDegrees(elements) else Nil - - kvs ++ degrees - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala deleted file mode 100644 index 63f4e2c..0000000 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.s2jobs.serde - -import com.typesafe.config.Config -import org.apache.hadoop.hbase.{KeyValue => HKeyValue} -import org.apache.s2graph.core.GraphElement -import org.apache.s2graph.s2jobs.loader.GraphFileOptions -import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader -import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter -import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper} -import org.apache.spark.rdd.RDD - -class SparkBulkLoaderTransformer(val config: Config, - val options: GraphFileOptions) extends Transformer[String, Seq[HKeyValue], org.apache.spark.rdd.RDD] { - val reader = new TsvBulkFormatReader - - val writer = new KeyValueWriter(options) - - override def read(input: RDD[String]): RDD[GraphElement] = input.mapPartitions { iter => - val s2 = S2GraphHelper.initS2Graph(config) - - iter.flatMap { line => - reader.read(s2)(line) - } - } - - override def write(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = elements.mapPartitions { iter => - val s2 = S2GraphHelper.initS2Graph(config) - - iter.map(writer.write(s2)(_)) - } - - override def buildDegrees(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = { - val degrees = elements.mapPartitions { iter => - val s2 = S2GraphHelper.initS2Graph(config) - - iter.flatMap { element => - DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L) - } - }.reduceByKey(_ + _) - - degrees.mapPartitions { iter => - val s2 = S2GraphHelper.initS2Graph(config) - - iter.map { case (degreeKey, count) => - DegreeKey.toKeyValue(s2, degreeKey, count) - } - } - } - - override def transform(input: RDD[String]): RDD[Seq[HKeyValue]] = { - val elements = read(input) - val kvs = write(elements) - - if (options.buildDegree) kvs ++ buildDegrees(elements) - kvs - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala index 3902c63..99afa25 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala @@ -21,30 +21,21 @@ package org.apache.s2graph.s2jobs.serde import com.typesafe.config.Config import org.apache.s2graph.core.GraphElement -import org.apache.s2graph.s2jobs.loader.GraphFileOptions + +import scala.reflect.ClassTag /** * Define serialize/deserialize. * Source -> GraphElement * GraphElement -> Target * - * @tparam S : Source class. ex) String, RDF.Statement, ... - * @tparam T : Target class. ex) KeyValue, Array[Byte], String, ... * @tparam M : Container type. ex) RDD, Seq, List, ... */ -trait Transformer[S, T, M[_]] extends Serializable { +trait Transformer[M[_]] extends Serializable { val config: Config - val options: GraphFileOptions - - val reader: GraphElementReadable[S] - - val writer: GraphElementWritable[T] - - def read(input: M[S]): M[GraphElement] - - def write(elements: M[GraphElement]): M[T] - def buildDegrees(elements: M[GraphElement]): M[T] + def buildDegrees[T: ClassTag](elements: M[GraphElement])(implicit writer: GraphElementWritable[T]): M[T] - def transform(input: M[S]): M[T] + def transform[S: ClassTag, T: ClassTag](input: M[S]) + (implicit reader: GraphElementReadable[S], writer: GraphElementWritable[T]): M[T] } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala new file mode 100644 index 0000000..2a15011 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs.serde.reader + +import org.apache.s2graph.core.{GraphElement, S2Graph} +import org.apache.s2graph.s2jobs.S2GraphHelper +import org.apache.s2graph.s2jobs.serde.GraphElementReadable +import org.apache.spark.sql.Row + +class RowBulkFormatReader extends GraphElementReadable[Row] { + private val RESERVED_COLUMN = Set("timestamp", "from", "to", "label", "operation", "elem", "direction") + + override def read(s2: S2Graph)(row: Row): Option[GraphElement] = + S2GraphHelper.sparkSqlRowToGraphElement(s2, row, row.schema, RESERVED_COLUMN) + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala index 22eee34..04c1b1d 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala @@ -21,14 +21,28 @@ package org.apache.s2graph.s2jobs.serde.writer import org.apache.hadoop.hbase.KeyValue import org.apache.s2graph.core.{GraphElement, S2Graph} -import org.apache.s2graph.s2jobs.S2GraphHelper -import org.apache.s2graph.s2jobs.loader.GraphFileOptions +import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper} import org.apache.s2graph.s2jobs.serde.GraphElementWritable -class KeyValueWriter(option: GraphFileOptions) extends GraphElementWritable[Seq[KeyValue]] { +class KeyValueWriter(autoEdgeCreate: Boolean = false, + skipError: Boolean = false) extends GraphElementWritable[Seq[KeyValue]] { override def write(s2: S2Graph)(element: GraphElement): Seq[KeyValue] = { - S2GraphHelper.toSKeyValues(s2, element, option).map { skv => - new KeyValue(skv.row, skv.cf, skv.qualifier, skv.timestamp, skv.value) + try { + S2GraphHelper.toSKeyValues(s2, element, autoEdgeCreate).map { skv => + new KeyValue(skv.row, skv.cf, skv.qualifier, skv.timestamp, skv.value) + } + } catch { + case e: Exception => + if (skipError) Nil else throw e + } + } + + override def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): Seq[KeyValue] = { + try { + DegreeKey.toKeyValue(s2, degreeKey, count) + } catch { + case e: Exception => + if (skipError) Nil else throw e } } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala index 3d5beb6..ab32fad 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala @@ -19,24 +19,34 @@ package org.apache.s2graph.s2jobs.task +import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions} +import org.apache.s2graph.core.Management +import org.apache.s2graph.s2jobs.S2GraphHelper +import org.apache.s2graph.s2jobs.loader.{HFileGenerator, SparkBulkLoaderTransformer} +import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader +import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter import org.apache.spark.sql._ import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger} import org.elasticsearch.spark.sql.EsSparkSQL +import scala.concurrent.Await +import scala.concurrent.duration.Duration + /** * Sink + * * @param queryName * @param conf */ -abstract class Sink(queryName:String, override val conf:TaskConf) extends Task { +abstract class Sink(queryName: String, override val conf: TaskConf) extends Task { val DEFAULT_CHECKPOINT_LOCATION = s"/tmp/streamingjob/${queryName}/${conf.name}" val DEFAULT_TRIGGER_INTERVAL = "10 seconds" - val FORMAT:String + val FORMAT: String - def preprocess(df:DataFrame):DataFrame = df + def preprocess(df: DataFrame): DataFrame = df - def write(inputDF: DataFrame):Unit = { + def write(inputDF: DataFrame): Unit = { val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism) if (inputDF.isStreaming) writeStream(df.writeStream) @@ -50,7 +60,7 @@ abstract class Sink(queryName:String, override val conf:TaskConf) extends Task { case "update" => OutputMode.Update() case "complete" => OutputMode.Complete() case _ => logger.warn(s"${LOG_PREFIX} unsupported output mode. use default output mode 'append'") - OutputMode.Append() + OutputMode.Append() } val interval = conf.options.getOrElse("interval", DEFAULT_TRIGGER_INTERVAL) val checkpointLocation = conf.options.getOrElse("checkpointLocation", DEFAULT_CHECKPOINT_LOCATION) @@ -88,9 +98,9 @@ abstract class Sink(queryName:String, override val conf:TaskConf) extends Task { writer.save(outputPath) } - protected def repartition(df:DataFrame, defaultParallelism:Int) = { + protected def repartition(df: DataFrame, defaultParallelism: Int) = { conf.options.get("numPartitions").map(n => Integer.parseInt(n)) match { - case Some(numOfPartitions:Int) => + case Some(numOfPartitions: Int) => if (numOfPartitions > defaultParallelism) df.repartition(numOfPartitions) else df.coalesce(numOfPartitions) case None => df @@ -100,14 +110,16 @@ abstract class Sink(queryName:String, override val conf:TaskConf) extends Task { /** * KafkaSink + * * @param queryName * @param conf */ -class KafkaSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) { +class KafkaSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) { override def mandatoryOptions: Set[String] = Set("kafka.bootstrap.servers", "topic") + override val FORMAT: String = "kafka" - override def preprocess(df:DataFrame):DataFrame = { + override def preprocess(df: DataFrame): DataFrame = { import org.apache.spark.sql.functions._ logger.debug(s"${LOG_PREFIX} schema: ${df.schema}") @@ -118,7 +130,7 @@ class KafkaSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) { val columns = df.columns df.select(concat_ws(delimiter, columns.map(c => col(c)): _*).alias("value")) - case format:String => + case format: String => if (format != "json") logger.warn(s"${LOG_PREFIX} unsupported format '$format'. use default json format") df.selectExpr("to_json(struct(*)) AS value") } @@ -130,21 +142,25 @@ class KafkaSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) { /** * FileSink + * * @param queryName * @param conf */ -class FileSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) { +class FileSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) { override def mandatoryOptions: Set[String] = Set("path", "format") + override val FORMAT: String = conf.options.getOrElse("format", "parquet") } /** * HiveSink + * * @param queryName * @param conf */ -class HiveSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) { +class HiveSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) { override def mandatoryOptions: Set[String] = Set("database", "table") + override val FORMAT: String = "hive" override protected def writeBatchInner(writer: DataFrameWriter[Row]): Unit = { @@ -161,11 +177,13 @@ class HiveSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) { /** * ESSink + * * @param queryName * @param conf */ -class ESSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) { +class ESSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) { override def mandatoryOptions: Set[String] = Set("es.nodes", "path", "es.port") + override val FORMAT: String = "es" override def write(inputDF: DataFrame): Unit = { @@ -182,14 +200,75 @@ class ESSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) { /** * S2graphSink + * * @param queryName * @param conf */ -class S2graphSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) { +class S2graphSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) { override def mandatoryOptions: Set[String] = Set() + override val FORMAT: String = "org.apache.s2graph.spark.sql.streaming.S2SinkProvider" - override protected def writeBatch(writer: DataFrameWriter[Row]): Unit = - throw new RuntimeException(s"unsupported source type for ${this.getClass.getSimpleName} : ${conf.name}") + private def writeBatchBulkload(df: DataFrame, runLoadIncrementalHFiles: Boolean = true): Unit = { + val options = TaskConf.toGraphFileOptions(conf) + val config = Management.toConfig(options.toConfigParams) + val input = df.rdd + + val transformer = new SparkBulkLoaderTransformer(config, options) + + implicit val reader = new RowBulkFormatReader + implicit val writer = new KeyValueWriter(options.autoEdgeCreate, options.skipError) + + val kvs = transformer.transform(input) + + HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, kvs.flatMap(ls => ls), options) + + // finish bulk load by execute LoadIncrementHFile. + if (runLoadIncrementalHFiles) HFileGenerator.loadIncrementalHFiles(options) + } + + private def writeBatchWithMutate(df:DataFrame):Unit = { + import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._ + + import scala.collection.JavaConversions._ + + val graphConfig: Config = ConfigFactory.parseMap(conf.options).withFallback(ConfigFactory.load()) + val serializedConfig = graphConfig.root().render(ConfigRenderOptions.concise()) + + val reader = new RowBulkFormatReader + + val groupedSize = getConfigString(graphConfig, S2_SINK_GROUPED_SIZE, DEFAULT_GROUPED_SIZE).toInt + val waitTime = getConfigString(graphConfig, S2_SINK_WAIT_TIME, DEFAULT_WAIT_TIME_SECONDS).toInt + + df.foreachPartition { iters => + val config = ConfigFactory.parseString(serializedConfig) + val s2Graph = S2GraphHelper.initS2Graph(config) + + val responses = iters.grouped(groupedSize).flatMap { rows => + val elements = rows.flatMap(row => reader.read(s2Graph)(row)) + + val mutateF = s2Graph.mutateElements(elements, true) + Await.result(mutateF, Duration(waitTime, "seconds")) + } + + val (success, fail) = responses.toSeq.partition(r => r.isSuccess) + logger.info(s"success : ${success.size}, fail : ${fail.size}") + } + } + + override def write(inputDF: DataFrame): Unit = { + val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism) + + if (inputDF.isStreaming) writeStream(df.writeStream) + else { + conf.options.getOrElse("writeMethod", "mutate") match { + case "mutate" => writeBatchWithMutate(df) + case "bulk" => + val runLoadIncrementalHFiles = conf.options.getOrElse("runLoadIncrementalHFiles", "true").toBoolean + writeBatchBulkload(df, runLoadIncrementalHFiles) + case writeMethod:String => throw new IllegalArgumentException(s"unsupported write method '$writeMethod' (valid method: mutate, bulk)") + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala index e8f11e3..ddd56bf 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala @@ -20,7 +20,16 @@ package org.apache.s2graph.s2jobs.task import org.apache.s2graph.s2jobs.Logger +import org.apache.s2graph.s2jobs.loader.GraphFileOptions +object TaskConf { + def toGraphFileOptions(taskConf: TaskConf): GraphFileOptions = { + val args = taskConf.options.filterKeys(GraphFileOptions.OptionKeys) + .flatMap(kv => Seq(kv._1, kv._2)).toSeq.toArray + + GraphFileOptions.toOption(args) + } +} case class TaskConf(name:String, `type`:String, inputs:Seq[String] = Nil, options:Map[String, String] = Map.empty) trait Task extends Serializable with Logger { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala index e15efe8..f6fecd7 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala @@ -21,6 +21,7 @@ package org.apache.s2graph.spark.sql.streaming import com.typesafe.config.ConfigFactory import org.apache.s2graph.core.{GraphElement, JSONParser} +import org.apache.s2graph.s2jobs.S2GraphHelper import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._ import org.apache.spark.TaskContext import org.apache.spark.sql.Row @@ -83,51 +84,6 @@ private [sql] class S2StreamQueryWriter( } } - private def rowToEdge(internalRow:InternalRow): Option[GraphElement] = { - val s2Graph = s2SinkContext.getGraph - val row = encoder.fromRow(internalRow) - - val timestamp = row.getAs[Long]("timestamp") - val operation = Try(row.getAs[String]("operation")).getOrElse("insert") - val elem = Try(row.getAs[String]("elem")).getOrElse("e") - - val props: Map[String, Any] = Option(row.getAs[String]("props")) match { - case Some(propsStr:String) => - JSONParser.fromJsonToProperties(Json.parse(propsStr).as[JsObject]) - case None => - schema.fieldNames.flatMap { field => - if (!RESERVED_COLUMN.contains(field)) { - Seq( - field -> getRowValAny(row, field) - ) - } else Nil - }.toMap - } - - elem match { - case "e" | "edge" => - val from = getRowValAny(row, "from") - val to = getRowValAny(row, "to") - val label = row.getAs[String]("label") - val direction = Try(row.getAs[String]("direction")).getOrElse("out") - Some( - s2Graph.elementBuilder.toEdge(from, to, label, direction, props, timestamp, operation) - ) - case "v" | "vertex" => - val id = getRowValAny(row, "id") - val serviceName = row.getAs[String]("service") - val columnName = row.getAs[String]("column") - Some( - s2Graph.elementBuilder.toVertex(serviceName, columnName, id, props, timestamp, operation) - ) - case _ => - logger.warn(s"'$elem' is not GraphElement. skipped!! (${row.toString()})") - None - } - } - - private def getRowValAny(row:Row, fieldName:String):Any = { - val idx = row.fieldIndex(fieldName) - row.get(idx) - } + private def rowToEdge(internalRow:InternalRow): Option[GraphElement] = + S2GraphHelper.sparkSqlRowToGraphElement(s2SinkContext.getGraph, encoder.fromRow(internalRow), schema, RESERVED_COLUMN) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala index 78000d4..0461d1e 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala @@ -21,6 +21,7 @@ package org.apache.s2graph.s2jobs import java.io.{File, PrintWriter} +import com.holdenkarau.spark.testing.DataFrameSuiteBase import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} import org.apache.s2graph.core.mysqls.{Label, ServiceColumn} import org.apache.s2graph.core.{Management, S2Graph} @@ -31,11 +32,8 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} import scala.util.Try -class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll { - private val master = "local[2]" - private val appName = "example-spark" +class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase { - protected var sc: SparkContext = _ protected val options = GraphFileOptions( input = "/tmp/test.txt", tempDir = "/tmp/bulkload_tmp", @@ -65,18 +63,15 @@ class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll { override def beforeAll(): Unit = { // initialize spark context. - val conf = new SparkConf() - .setMaster(master) - .setAppName(appName) - - sc = new SparkContext(conf) + super.beforeAll() s2 = S2GraphHelper.initS2Graph(s2Config) initTestDataFile } override def afterAll(): Unit = { - if (sc != null) sc.stop() + super.afterAll() + if (s2 != null) s2.shutdown() } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala index 6e89466..f2b0102 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala @@ -19,6 +19,17 @@ package org.apache.s2graph.s2jobs -class S2GraphHelperTest { +import org.apache.s2graph.s2jobs.task.TaskConf +class S2GraphHelperTest extends BaseSparkTest { + test("toGraphFileOptions") { + val args = options.toCommand.grouped(2).map { kv => + kv.head -> kv.last + }.toMap ++ Map("db.default.url" -> "jdbc://localhost:3306/mysql") + + println(args) + val taskConf = TaskConf("dummy", "sink", Nil, args) + val graphFileOptions = S2GraphHelper.toGraphFileOptions(taskConf) + println(graphFileOptions) + } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala index 3bd1a23..991897b 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala @@ -19,12 +19,16 @@ package org.apache.s2graph.s2jobs.loader -import org.apache.s2graph.core.PostProcess +import org.apache.s2graph.core.{PostProcess, S2VertexLike} import org.apache.s2graph.core.storage.{CanSKeyValue, SKeyValue} import org.apache.s2graph.s2jobs.BaseSparkTest -import org.apache.s2graph.s2jobs.serde.{LocalBulkLoaderTransformer, SparkBulkLoaderTransformer} +import org.apache.s2graph.s2jobs.serde.reader.{RowBulkFormatReader, TsvBulkFormatReader} +import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter +import org.apache.spark.rdd.RDD import play.api.libs.json.Json +import scala.io.Source + class GraphFileGeneratorTest extends BaseSparkTest { import org.apache.hadoop.hbase.{KeyValue => HKeyValue} @@ -34,8 +38,12 @@ class GraphFileGeneratorTest extends BaseSparkTest { def transformToSKeyValues(transformerMode: String, edges: Seq[String]): List[SKeyValue] = { transformerMode match { case "spark" => - val input = sc.parallelize(edges) + val input: RDD[String] = sc.parallelize(edges) val transformer = new SparkBulkLoaderTransformer(s2Config, options) + + implicit val reader = new TsvBulkFormatReader + implicit val writer = new KeyValueWriter + val kvs = transformer.transform(input) kvs.flatMap { kvs => kvs.map { kv => @@ -46,12 +54,43 @@ class GraphFileGeneratorTest extends BaseSparkTest { case "local" => val input = edges val transformer = new LocalBulkLoaderTransformer(s2Config, options) + + implicit val reader = new TsvBulkFormatReader + implicit val writer = new KeyValueWriter + val kvs = transformer.transform(input) kvs.flatMap { kvs => kvs.map { kv => CanSKeyValue.hbaseKeyValue.toSKeyValue(kv) } }.toList + + case "dataset" => + import spark.sqlContext.implicits._ + val elements = edges.flatMap(s2.elementBuilder.toEdge(_)) + + val rows = elements.map { e => + (e.getTs(), + e.getOperation(), + "e", + e.srcVertex.innerIdVal.toString, + e.tgtVertex.innerIdVal.toString, + e.label(), + "{}", + e.getDirection()) + }.toDF("timestamp", "operation", "element", "from", "to", "label", "props", "direction").rdd + + val transformer = new SparkBulkLoaderTransformer(s2Config, options) + + implicit val reader = new RowBulkFormatReader + implicit val writer = new KeyValueWriter + + val kvs = transformer.transform(rows) + kvs.flatMap { kvs => + kvs.map { kv => + CanSKeyValue.hbaseKeyValue.toSKeyValue(kv) + } + }.collect().toList } } @@ -61,7 +100,7 @@ class GraphFileGeneratorTest extends BaseSparkTest { val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}" - val transformerMode = "spark" + val transformerMode = "dataset" val ls = transformToSKeyValues(transformerMode, Seq(bulkEdgeString)) val serDe = s2.defaultStorage.serDe @@ -143,42 +182,36 @@ class GraphFileGeneratorTest extends BaseSparkTest { } // this test case expect options.input already exist with valid bulk load format. - // test("bulk load and fetch vertex: spark mode") { - // import scala.collection.JavaConverters._ - // val serviceColumn = initTestVertexSchema(s2) - // - // val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq - // val input = sc.parallelize(bulkVertexLs) - // - // HFileGenerator.generate(sc, s2Config, input, options) - // - // val hfileArgs = Array(options.output, options.tableName) - // val hbaseConfig = HBaseConfiguration.create() - // - // val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs) - // - // val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike]) - // val json = PostProcess.verticesToJson(s2Vertices) - // - // println(Json.prettyPrint(json)) - // } + test("bulk load and fetch vertex: spark mode") { + import scala.collection.JavaConverters._ + val serviceColumn = initTestVertexSchema(s2) + + val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq + val input = sc.parallelize(bulkVertexLs) + + HFileGenerator.generate(sc, s2Config, input, options) + HFileGenerator.loadIncrementHFile(options) + + val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike]) + val json = PostProcess.verticesToJson(s2Vertices) + + println(Json.prettyPrint(json)) + } // this test case expect options.input already exist with valid bulk load format. - // test("bulk load and fetch vertex: mr mode") { - // val serviceColumn = initTestVertexSchema(s2) - // - // val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq - // val input = sc.parallelize(bulkVertexLs) - // - // HFileMRGenerator.generate(sc, s2Config, input, options) - // - // val hfileArgs = Array(options.output, options.tableName) - // val hbaseConfig = HBaseConfiguration.create() - // - // val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs) - // val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike]) - // val json = PostProcess.verticesToJson(s2Vertices) - // - // println(Json.prettyPrint(json)) - // } +// test("bulk load and fetch vertex: mr mode") { +// import scala.collection.JavaConverters._ +// val serviceColumn = initTestVertexSchema(s2) +// +// val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq +// val input = sc.parallelize(bulkVertexLs) +// +// HFileMRGenerator.generate(sc, s2Config, input, options) +// HFileGenerator.loadIncrementHFile(options) +// +// val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike]) +// val json = PostProcess.verticesToJson(s2Vertices) +// +// println(Json.prettyPrint(json)) +// } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala new file mode 100644 index 0000000..47d4628 --- /dev/null +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs.task + +import org.apache.s2graph.core.S2EdgeLike +import org.apache.s2graph.s2jobs.BaseSparkTest +import org.apache.spark.sql.DataFrame + +import scala.collection.JavaConverters._ + +class SinkTest extends BaseSparkTest { + override def beforeAll(): Unit = { + super.beforeAll() + initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm) + } + def toDataFrame(edges: Seq[String]): DataFrame = { + import spark.sqlContext.implicits._ + val elements = edges.flatMap(s2.elementBuilder.toEdge(_)) + + elements.map { e => + (e.getTs(), + e.getOperation(), + "e", + e.srcVertex.innerIdVal.toString, + e.tgtVertex.innerIdVal.toString, + e.label(), + "{}", + e.getDirection()) + }.toDF("timestamp", "operation", "element", "from", "to", "label", "props", "direction") + } + + test("S2graphSink writeBatchWithBulkload") { + val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}" + val df = toDataFrame(Seq(bulkEdgeString)) + val args = Map("writeMethod" -> "bulk") ++ + options.toCommand.grouped(2).map { kv => + kv.head -> kv.last + }.toMap + + val conf = TaskConf("test", "sql", Seq("input"), args) + + val sink = new S2graphSink("testQuery", conf) + sink.write(df) + + val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike]) + s2Edges.foreach { edge => println(edge) } + + val filteredEdges = s2Edges.filter{ edge => + edge.srcVertex.innerIdVal.toString == "a" && + edge.tgtVertex.innerIdVal.toString == "b" && + edge.label() == "friends" + } + + assert(filteredEdges.size == 1) + } + + test("S2graphSink writeBatchWithMutate") { + val bulkEdgeString = "1416236400000\tinsert\tedge\tb\tc\tfriends\t{\"since\":1316236400000,\"score\":20}" + val df = toDataFrame(Seq(bulkEdgeString)) + val args = Map("writeMethod" -> "mutate") ++ + options.toCommand.grouped(2).map { kv => kv.head -> kv.last }.toMap + + val conf = TaskConf("test", "sql", Seq("input"), args) + + val sink = new S2graphSink("testQuery", conf) + sink.write(df) + + val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike]) + s2Edges.foreach { edge => println(edge) } + + val filteredEdges = s2Edges.filter{ edge => + edge.srcVertex.innerIdVal.toString == "b" && + edge.tgtVertex.innerIdVal.toString == "c" && + edge.getTs() == 1416236400000L && + edge.label() == "friends" + } + + assert(filteredEdges.size == 1) + } + +}