Repository: incubator-s2graph Updated Branches: refs/heads/master 66b656f69 -> 5c42b1cf6
add udfs option Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/d67adc5e Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/d67adc5e Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/d67adc5e Branch: refs/heads/master Commit: d67adc5ecb43a4b9c5c9961c44330dd72250b058 Parents: c30531b Author: Chul Kang <el...@apache.org> Authored: Thu Jun 21 12:03:03 2018 +0900 Committer: Chul Kang <el...@apache.org> Committed: Thu Jun 21 12:20:41 2018 +0900 ---------------------------------------------------------------------- .../apache/s2graph/s2jobs/JobDescription.scala | 8 ++++++-- .../org/apache/s2graph/s2jobs/JobLauncher.scala | 8 ++++++++ .../org/apache/s2graph/s2jobs/task/Sink.scala | 20 ++++++++++---------- .../org/apache/s2graph/s2jobs/udfs/Udf.scala | 14 ++++++++++++++ 4 files changed, 38 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d67adc5e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala index 9a529aa..0943056 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala @@ -21,28 +21,32 @@ package org.apache.s2graph.s2jobs import play.api.libs.json.{JsValue, Json} import org.apache.s2graph.s2jobs.task._ +import org.apache.s2graph.s2jobs.udfs.UdfOption case class JobDescription( name:String, + udfs: Seq[UdfOption], sources:Seq[Source], processes:Seq[task.Process], sinks:Seq[Sink] ) object JobDescription extends Logger { - val dummy = JobDescription("dummy", Nil, Nil, Nil) + val dummy = JobDescription("dummy", Nil, Nil, Nil, Nil) def apply(jsVal:JsValue):JobDescription = { implicit val TaskConfReader = Json.reads[TaskConf] + implicit val UdfOptionReader = Json.reads[UdfOption] logger.debug(s"JobDescription: ${jsVal}") val jobName = (jsVal \ "name").as[String] + val udfs = (jsVal \ "udfs").asOpt[Seq[UdfOption]].getOrElse(Nil) val sources = (jsVal \ "source").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getSource(conf)) val processes = (jsVal \ "process").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getProcess(conf)) val sinks = (jsVal \ "sink").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getSink(jobName, conf)) - JobDescription(jobName, sources, processes, sinks) + JobDescription(jobName, udfs, sources, processes, sinks) } def getSource(conf:TaskConf):Source = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d67adc5e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala index a64a399..0a76274 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala @@ -19,6 +19,7 @@ package org.apache.s2graph.s2jobs +import org.apache.s2graph.s2jobs.udfs.Udf import org.apache.spark.sql.SparkSession import play.api.libs.json.{JsValue, Json} @@ -82,6 +83,13 @@ object JobLauncher extends Logger { .enableHiveSupport() .getOrCreate() + // register udfs + jobDescription.udfs.foreach{ udfOption => + val udf = Class.forName(udfOption.`class`).newInstance().asInstanceOf[Udf] + logger.info((s"[udf register] ${udfOption}")) + udf.register(ss, udfOption.name, udfOption.params.getOrElse(Map.empty)) + } + val job = new Job(ss, jobDescription) job.run() } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d67adc5e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala index 4de585c..9c07668 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala @@ -20,22 +20,17 @@ package org.apache.s2graph.s2jobs.task import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions} -import org.apache.hadoop.hbase.HBaseConfiguration -import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles -import org.apache.hadoop.util.ToolRunner -import org.apache.s2graph.core.{GraphUtil, Management} +import org.apache.s2graph.core.GraphUtil import org.apache.s2graph.s2jobs.S2GraphHelper -import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, SparkBulkLoaderTransformer} +import org.apache.s2graph.s2jobs.loader.{HFileGenerator, SparkBulkLoaderTransformer} import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter -import org.apache.s2graph.spark.sql.streaming.S2SinkContext import org.apache.spark.sql._ import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger} import org.elasticsearch.spark.sql.EsSparkSQL -import scala.collection.mutable.ListBuffer -import scala.concurrent.{Await, Future} import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} /** * Sink @@ -212,9 +207,10 @@ class ESSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) { * @param conf */ class S2GraphSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) { - import scala.collection.JavaConversions._ - import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._ import org.apache.s2graph.core.S2GraphConfigs._ + import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._ + + import scala.collection.JavaConversions._ override def mandatoryOptions: Set[String] = Set() @@ -261,6 +257,10 @@ class S2GraphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con } private def writeBatchWithMutate(df:DataFrame):Unit = { + import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._ + + import scala.collection.JavaConversions._ + // TODO: FIX THIS. overwrite local cache config. val mergedOptions = conf.options ++ TaskConf.parseLocalCacheConfigs(conf) val graphConfig: Config = ConfigFactory.parseMap(mergedOptions).withFallback(ConfigFactory.load()) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d67adc5e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Udf.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Udf.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Udf.scala new file mode 100644 index 0000000..821527c --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Udf.scala @@ -0,0 +1,14 @@ +package org.apache.s2graph.s2jobs.udfs + +import org.apache.s2graph.s2jobs.Logger +import org.apache.spark.sql.SparkSession + +case class UdfOption(name:String, `class`:String, params:Option[Map[String, String]] = None) +trait Udf extends Serializable with Logger { + def register(ss: SparkSession, name:String, options:Map[String, String]) +} + + + + +