test cases passed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/f90a7f54 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/f90a7f54 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/f90a7f54 Branch: refs/heads/master Commit: f90a7f54401f67130360ca711a3378a726aeac4b Parents: e42f7b2 Author: DO YUNG YOON <steams...@apache.org> Authored: Fri Jun 15 05:04:35 2018 +0900 Committer: DO YUNG YOON <steams...@apache.org> Committed: Fri Jun 15 05:04:35 2018 +0900 ---------------------------------------------------------------------- .../s2graph/s2jobs/loader/HFileGenerator.scala | 4 ++-- .../org/apache/s2graph/s2jobs/task/Sink.scala | 2 +- .../org/apache/s2graph/s2jobs/task/Source.scala | 25 ++++++++++++-------- .../org/apache/s2graph/s2jobs/task/Task.scala | 21 ++++++++-------- .../spark/sql/streaming/S2SinkConfigs.scala | 14 +++-------- .../spark/sql/streaming/S2SourceConfigs.scala | 21 ++++++++++++++++ .../apache/s2graph/s2jobs/BaseSparkTest.scala | 22 ++++++++++++++--- .../s2graph/s2jobs/S2GraphHelperTest.scala | 20 ++++++++-------- .../apache/s2graph/s2jobs/task/SinkTest.scala | 7 +----- .../apache/s2graph/s2jobs/task/SourceTest.scala | 25 ++++++++++---------- 10 files changed, 95 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f90a7f54/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala index 1818d10..3d3821e 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.regionserver.BloomType import org.apache.hadoop.hbase.util.{Base64, Bytes} import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.util.ToolRunner +import org.apache.s2graph.core.S2GraphConfigs import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter @@ -55,7 +56,7 @@ object HFileGenerator extends RawFileGenerator[String, KeyValue] { def toHBaseConfig(zkQuorum: String, tableName: String): Configuration = { val hbaseConf = HBaseConfiguration.create() - hbaseConf.set("hbase.zookeeper.quorum", zkQuorum) + hbaseConf.set(S2GraphConfigs.HBaseConfigs.HBASE_ZOOKEEPER_QUORUM, zkQuorum) hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName) hbaseConf @@ -164,7 +165,6 @@ object HFileGenerator extends RawFileGenerator[String, KeyValue] { restorePath: String, tableNames: Seq[String], columnFamily: String = "e", - elementType: String = "IndexEdge", batchSize: Int = 1000, labelMapping: Map[String, String] = Map.empty, buildDegree: Boolean = false): RDD[Seq[Cell]] = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f90a7f54/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala index 8c88ee8..6a84e00 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala @@ -299,7 +299,7 @@ class S2GraphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con if (inputDF.isStreaming) writeStream(df.writeStream) else { - conf.options.getOrElse("writeMethod", "mutate") match { + conf.options.getOrElse(S2_SINK_WRITE_METHOD, "mutate") match { case "mutate" => writeBatchWithMutate(df) case "bulk" => val runLoadIncrementalHFiles = conf.options.getOrElse("runLoadIncrementalHFiles", "true").toBoolean http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f90a7f54/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala index 9c80e58..e848ab3 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala @@ -112,28 +112,33 @@ class HiveSource(conf:TaskConf) extends Source(conf) { } class S2GraphSource(conf: TaskConf) extends Source(conf) { - - override def mandatoryOptions: Set[String] = Set("hbase.rootdir", "restore.path", "hbase.table.names") + import org.apache.s2graph.spark.sql.streaming.S2SourceConfigs._ + override def mandatoryOptions: Set[String] = Set( + S2_SOURCE_BULKLOAD_HBASE_ROOT_DIR, + S2_SOURCE_BULKLOAD_RESTORE_PATH, + S2_SOURCE_BULKLOAD_HBASE_TABLE_NAMES + ) override def toDF(ss: SparkSession): DataFrame = { val mergedConf = TaskConf.parseHBaseConfigs(conf) ++ TaskConf.parseMetaStoreConfigs(conf) ++ TaskConf.parseLocalCacheConfigs(conf) val config = Management.toConfig(mergedConf) - val snapshotPath = conf.options("hbase.rootdir") - val restorePath = conf.options("restore.path") - val tableNames = conf.options("hbase.table.names").split(",") - val columnFamily = conf.options.getOrElse("hbase.table.cf", "e") - val batchSize = conf.options.getOrElse("scan.batch.size", "1000").toInt + val snapshotPath = conf.options(S2_SOURCE_BULKLOAD_HBASE_ROOT_DIR) + val restorePath = conf.options(S2_SOURCE_BULKLOAD_RESTORE_PATH) + val tableNames = conf.options(S2_SOURCE_BULKLOAD_HBASE_TABLE_NAMES).split(",") + val columnFamily = conf.options.getOrElse(S2_SOURCE_BULKLOAD_HBASE_TABLE_CF, "e") + val batchSize = conf.options.getOrElse(S2_SOURCE_BULKLOAD_SCAN_BATCH_SIZE, "1000").toInt + val labelMapping = Map.empty[String, String] val buildDegree = if (columnFamily == "v") false - else conf.options.getOrElse("build.degree", "false").toBoolean - val elementType = conf.options.getOrElse("element.type", "IndexEdge") + else conf.options.getOrElse(S2_SOURCE_BULKLOAD_BUILD_DEGREE, "false").toBoolean + val elementType = conf.options.getOrElse(S2_SOURCE_ELEMENT_TYPE, "IndexEdge") val schema = if (columnFamily == "v") Schema.VertexSchema else Schema.EdgeSchema val cells = HFileGenerator.tableSnapshotDump(ss, config, snapshotPath, - restorePath, tableNames, columnFamily, elementType, batchSize, labelMapping, buildDegree) + restorePath, tableNames, columnFamily, batchSize, labelMapping, buildDegree) implicit val reader = new S2GraphCellReader(elementType) implicit val writer = new RowDataFrameWriter http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f90a7f54/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala index ea42828..51e13f0 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala @@ -19,27 +19,28 @@ package org.apache.s2graph.s2jobs.task +import org.apache.s2graph.core.S2GraphConfigs import org.apache.s2graph.s2jobs.Logger -import org.apache.s2graph.s2jobs.loader.GraphFileOptions +//import org.apache.s2graph.s2jobs.loader.GraphFileOptions object TaskConf { - def toGraphFileOptions(taskConf: TaskConf): GraphFileOptions = { - val args = taskConf.options.filterKeys(GraphFileOptions.OptionKeys) - .flatMap(kv => Seq(kv._1, kv._2)).toSeq.toArray - - GraphFileOptions.toOption(args) - } +// def toGraphFileOptions(taskConf: TaskConf): GraphFileOptions = { +// val args = taskConf.options.filterKeys(GraphFileOptions.OptionKeys) +// .flatMap(kv => Seq(kv._1, kv._2)).toSeq.toArray +// +// GraphFileOptions.toOption(args) +// } def parseHBaseConfigs(taskConf: TaskConf): Map[String, Any] = { - taskConf.options.filterKeys(_.startsWith("hbase.")) + taskConf.options.filterKeys(S2GraphConfigs.HBaseConfigs.DEFAULTS.keySet) } def parseMetaStoreConfigs(taskConf: TaskConf): Map[String, Any] = { - taskConf.options.filterKeys(_.startsWith("db.")) + taskConf.options.filterKeys(S2GraphConfigs.DBConfigs.DEFAULTS.keySet) } def parseLocalCacheConfigs(taskConf: TaskConf): Map[String, Any] = { - taskConf.options.filterKeys(_.startsWith("cache.")).mapValues(_.toInt) + taskConf.options.filterKeys(S2GraphConfigs.CacheConfigs.DEFAULTS.keySet).mapValues(_.toInt) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f90a7f54/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala index d778478..561a810 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala @@ -24,23 +24,14 @@ import com.typesafe.config.Config import scala.util.Try object S2SinkConfigs { - // Common - - // meta storage Configurations. -// val DB_DEFAULT_DRIVER = "db.default.driver" -// val DB_DEFAULT_URL = "db.default.url" -// val DB_DEFAULT_PASSWORD = "db.default.password" -// val DB_DEFAULT_USER = "db.default.user" -// -// val HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum" - - val DEFAULT_GROUPED_SIZE = "100" val DEFAULT_WAIT_TIME_SECONDS = "5" val S2_SINK_PREFIX = "s2.spark.sql.streaming.sink" val S2_SINK_BULKLOAD_PREFIX = "s2.spark.sql.bulkload.sink" + val S2_SINK_WRITE_METHOD = s"$S2_SINK_PREFIX.writeMethod" + val S2_SINK_QUERY_NAME = s"$S2_SINK_PREFIX.queryname" val S2_SINK_LOG_PATH = s"$S2_SINK_PREFIX.logpath" val S2_SINK_CHECKPOINT_LOCATION = "checkpointlocation" @@ -58,6 +49,7 @@ object S2SinkConfigs { val S2_SINK_BULKLOAD_HBASE_INCREMENTAL_LOAD = s"$S2_SINK_BULKLOAD_PREFIX.hbase.incrementalLoad" val S2_SINK_BULKLOAD_HBASE_COMPRESSION = s"$S2_SINK_BULKLOAD_PREFIX.hbase.compression" + // val S2_SINK_BULKLOAD_AUTO_EDGE_CREATE= s"$S2_SINK_BULKLOAD_PREFIX.auto.edge.create" val S2_SINK_BULKLOAD_BUILD_DEGREE = s"$S2_SINK_BULKLOAD_PREFIX.build.degree" val S2_SINK_BULKLOAD_LABEL_MAPPING = s"$S2_SINK_BULKLOAD_PREFIX.label.mapping" http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f90a7f54/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SourceConfigs.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SourceConfigs.scala b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SourceConfigs.scala new file mode 100644 index 0000000..d4553b0 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SourceConfigs.scala @@ -0,0 +1,21 @@ +package org.apache.s2graph.spark.sql.streaming + +object S2SourceConfigs { + val S2_SOURCE_PREFIX = "s2.spark.sql.streaming.source" + val S2_SOURCE_BULKLOAD_PREFIX = "s2.spark.sql.bulkload.source" + + // + // vertex/indexedge/snapshotedge + val S2_SOURCE_ELEMENT_TYPE = s"$S2_SOURCE_PREFIX.element.type" + + // HBASE HFILE BULK + val S2_SOURCE_BULKLOAD_HBASE_ROOT_DIR = s"$S2_SOURCE_BULKLOAD_PREFIX.hbase.rootdir" + val S2_SOURCE_BULKLOAD_RESTORE_PATH = s"$S2_SOURCE_BULKLOAD_PREFIX.restore.path" + val S2_SOURCE_BULKLOAD_HBASE_TABLE_NAMES = s"$S2_SOURCE_BULKLOAD_PREFIX.hbase.table.names" + val S2_SOURCE_BULKLOAD_HBASE_TABLE_CF = s"$S2_SOURCE_BULKLOAD_PREFIX.hbase.table.cf" + val S2_SOURCE_BULKLOAD_SCAN_BATCH_SIZE = s"$S2_SOURCE_BULKLOAD_PREFIX.scan.batch.size" + + // BULKLOAD + val S2_SOURCE_BULKLOAD_BUILD_DEGREE = s"$S2_SOURCE_BULKLOAD_PREFIX.build.degree" + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f90a7f54/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala index f97dce4..d30b7a3 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala @@ -35,11 +35,27 @@ import scala.util.Try class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase { import org.apache.s2graph.core.S2GraphConfigs._ - import S2SinkConfigs._ + protected val bulkloadOptions = new TaskConf("bulkloadOptions", "test", options = Map( - DBConfigs.DEFAULT_DB_DEFAULT_URL -> "jdbc:h2:file:./var/metastore_jobs;MODE=MYSQL" + DBConfigs.DB_DEFAULT_DRIVER -> "org.h2.Driver", + DBConfigs.DB_DEFAULT_URL -> "jdbc:h2:file:./var/metastore_jobs;MODE=MYSQL", + DBConfigs.DB_DEFAULT_USER -> "sa", + DBConfigs.DB_DEFAULT_PASSWORD -> "sa", + HBaseConfigs.HBASE_ZOOKEEPER_QUORUM -> "localhost", + + S2SinkConfigs.S2_SINK_WRITE_METHOD -> "bulk", + + S2SinkConfigs.S2_SINK_BULKLOAD_HBASE_TABLE_NAME -> "s2graph", + S2SinkConfigs.S2_SINK_BULKLOAD_HBASE_NUM_REGIONS -> "3", + S2SinkConfigs.S2_SINK_BULKLOAD_HBASE_TEMP_DIR -> "/tmp/bulkload_tmp", + S2SinkConfigs.S2_SINK_BULKLOAD_HBASE_INCREMENTAL_LOAD -> "true", + S2SinkConfigs.S2_SINK_BULKLOAD_HBASE_COMPRESSION -> "NONE", + + S2SinkConfigs.S2_SINK_BULKLOAD_AUTO_EDGE_CREATE -> "false", + S2SinkConfigs.S2_SINK_BULKLOAD_BUILD_DEGREE -> "false", + S2SinkConfigs.S2_SINK_BULKLOAD_LABEL_MAPPING -> "" )) - protected val options = GraphFileOptions( + protected val options: GraphFileOptions = GraphFileOptions( input = "/tmp/test.txt", tempDir = "/tmp/bulkload_tmp", output = "/tmp/s2graph_bulkload", http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f90a7f54/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala index 6b21cfc..0b2f5fa 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala @@ -22,14 +22,14 @@ package org.apache.s2graph.s2jobs import org.apache.s2graph.s2jobs.task.TaskConf class S2GraphHelperTest extends BaseSparkTest { - test("toGraphFileOptions") { - val args = options.toCommand.grouped(2).map { kv => - kv.head -> kv.last - }.toMap ++ Map("db.default.url" -> "jdbc://localhost:3306/mysql") - - println(args) - val taskConf = TaskConf("dummy", "sink", Nil, args) - val graphFileOptions = TaskConf.toGraphFileOptions(taskConf) - println(graphFileOptions) - } +// test("toGraphFileOptions") { +// val args = options.toCommand.grouped(2).map { kv => +// kv.head -> kv.last +// }.toMap ++ Map("db.default.url" -> "jdbc://localhost:3306/mysql") +// +// println(args) +// val taskConf = TaskConf("dummy", "sink", Nil, args) +// val graphFileOptions = TaskConf.toGraphFileOptions(taskConf) +// println(graphFileOptions) +// } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f90a7f54/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala index c4a9033..3918c84 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala @@ -50,12 +50,7 @@ class SinkTest extends BaseSparkTest { test("S2graphSink writeBatchWithBulkload") { val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}" val df = toDataFrame(Seq(bulkEdgeString)) - val args = Map("writeMethod" -> "bulk") ++ - options.toCommand.grouped(2).map { kv => - kv.head -> kv.last - }.toMap - - val conf = TaskConf("test", "sql", Seq("input"), args) + val conf = bulkloadOptions val sink = new S2GraphSink("testQuery", conf) sink.write(df) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f90a7f54/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala index 4715bd6..0c2cbb7 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala @@ -49,13 +49,7 @@ class SourceTest extends BaseSparkTest { val input = df.collect().flatMap(reader.read(s2)(_)) - val args = options.toCommand.grouped(2).map { kv => - kv.head -> kv.last - }.toMap ++ Map("writeMethod" -> "bulk", "runLoadIncrementalHFiles" -> "true") - - val conf = TaskConf("test", "sql", Seq("input"), args) - - val sink = new S2GraphSink("testQuery", conf) + val sink = new S2GraphSink("testQuery", bulkloadOptions) sink.write(df) // 2. create snapshot if snapshot is not exist to test TableSnapshotInputFormat. @@ -70,15 +64,20 @@ class SourceTest extends BaseSparkTest { } // 3. Decode S2GraphSource to parse HFile - val metaAndHBaseArgs = options.toConfigParams + val metaAndHBaseArgs = { + TaskConf.parseHBaseConfigs(bulkloadOptions) ++ + TaskConf.parseMetaStoreConfigs(bulkloadOptions) + }.mapValues(_.toString) + val hbaseConfig = HBaseConfiguration.create(spark.sparkContext.hadoopConfiguration) + import org.apache.s2graph.spark.sql.streaming.S2SourceConfigs._ val dumpArgs = Map( - "hbase.rootdir" -> hbaseConfig.get("hbase.rootdir"), - "restore.path" -> "/tmp/hbase_restore", - "hbase.table.names" -> s"${snapshotTableName}", - "hbase.table.cf" -> columnFamily, - "element.type" -> elementType + S2_SOURCE_BULKLOAD_HBASE_ROOT_DIR -> hbaseConfig.get("hbase.rootdir"), + S2_SOURCE_BULKLOAD_RESTORE_PATH -> "/tmp/hbase_restore", + S2_SOURCE_BULKLOAD_HBASE_TABLE_NAMES -> s"${snapshotTableName}", + S2_SOURCE_BULKLOAD_HBASE_TABLE_CF -> columnFamily, + S2_SOURCE_ELEMENT_TYPE -> elementType ) ++ metaAndHBaseArgs val dumpConf = TaskConf("dump", "sql", Seq("input"), dumpArgs)