Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 66b656f69 -> 5c42b1cf6


add udfs option


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/d67adc5e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/d67adc5e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/d67adc5e

Branch: refs/heads/master
Commit: d67adc5ecb43a4b9c5c9961c44330dd72250b058
Parents: c30531b
Author: Chul Kang <el...@apache.org>
Authored: Thu Jun 21 12:03:03 2018 +0900
Committer: Chul Kang <el...@apache.org>
Committed: Thu Jun 21 12:20:41 2018 +0900

----------------------------------------------------------------------
 .../apache/s2graph/s2jobs/JobDescription.scala  |  8 ++++++--
 .../org/apache/s2graph/s2jobs/JobLauncher.scala |  8 ++++++++
 .../org/apache/s2graph/s2jobs/task/Sink.scala   | 20 ++++++++++----------
 .../org/apache/s2graph/s2jobs/udfs/Udf.scala    | 14 ++++++++++++++
 4 files changed, 38 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d67adc5e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
index 9a529aa..0943056 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
@@ -21,28 +21,32 @@ package org.apache.s2graph.s2jobs
 
 import play.api.libs.json.{JsValue, Json}
 import org.apache.s2graph.s2jobs.task._
+import org.apache.s2graph.s2jobs.udfs.UdfOption
 
 case class JobDescription(
                            name:String,
+                           udfs: Seq[UdfOption],
                            sources:Seq[Source],
                            processes:Seq[task.Process],
                            sinks:Seq[Sink]
                          )
 
 object JobDescription extends Logger {
-  val dummy = JobDescription("dummy", Nil, Nil, Nil)
+  val dummy = JobDescription("dummy", Nil, Nil, Nil, Nil)
 
   def apply(jsVal:JsValue):JobDescription = {
     implicit val TaskConfReader = Json.reads[TaskConf]
+    implicit val UdfOptionReader = Json.reads[UdfOption]
 
     logger.debug(s"JobDescription: ${jsVal}")
 
     val jobName = (jsVal \ "name").as[String]
+    val udfs = (jsVal \ "udfs").asOpt[Seq[UdfOption]].getOrElse(Nil)
     val sources = (jsVal \ 
"source").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getSource(conf))
     val processes = (jsVal \ 
"process").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getProcess(conf))
     val sinks = (jsVal \ "sink").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf 
=> getSink(jobName, conf))
 
-    JobDescription(jobName, sources, processes, sinks)
+    JobDescription(jobName, udfs, sources, processes, sinks)
   }
 
   def getSource(conf:TaskConf):Source = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d67adc5e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala
index a64a399..0a76274 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala
@@ -19,6 +19,7 @@
 
 package org.apache.s2graph.s2jobs
 
+import org.apache.s2graph.s2jobs.udfs.Udf
 import org.apache.spark.sql.SparkSession
 import play.api.libs.json.{JsValue, Json}
 
@@ -82,6 +83,13 @@ object JobLauncher extends Logger {
       .enableHiveSupport()
       .getOrCreate()
 
+    // register udfs
+    jobDescription.udfs.foreach{ udfOption =>
+      val udf = 
Class.forName(udfOption.`class`).newInstance().asInstanceOf[Udf]
+      logger.info((s"[udf register] ${udfOption}"))
+      udf.register(ss, udfOption.name, udfOption.params.getOrElse(Map.empty))
+    }
+
     val job = new Job(ss, jobDescription)
     job.run()
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d67adc5e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 4de585c..9c07668 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -20,22 +20,17 @@
 package org.apache.s2graph.s2jobs.task
 
 import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions}
-import org.apache.hadoop.hbase.HBaseConfiguration
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
-import org.apache.hadoop.util.ToolRunner
-import org.apache.s2graph.core.{GraphUtil, Management}
+import org.apache.s2graph.core.GraphUtil
 import org.apache.s2graph.s2jobs.S2GraphHelper
-import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, 
SparkBulkLoaderTransformer}
+import org.apache.s2graph.s2jobs.loader.{HFileGenerator, 
SparkBulkLoaderTransformer}
 import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader
 import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
-import org.apache.s2graph.spark.sql.streaming.S2SinkContext
 import org.apache.spark.sql._
 import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger}
 import org.elasticsearch.spark.sql.EsSparkSQL
 
-import scala.collection.mutable.ListBuffer
-import scala.concurrent.{Await, Future}
 import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
 
 /**
   * Sink
@@ -212,9 +207,10 @@ class ESSink(queryName: String, conf: TaskConf) extends 
Sink(queryName, conf) {
   * @param conf
   */
 class S2GraphSink(queryName: String, conf: TaskConf) extends Sink(queryName, 
conf) {
-  import scala.collection.JavaConversions._
-  import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
   import org.apache.s2graph.core.S2GraphConfigs._
+  import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
+
+  import scala.collection.JavaConversions._
 
   override def mandatoryOptions: Set[String] = Set()
 
@@ -261,6 +257,10 @@ class S2GraphSink(queryName: String, conf: TaskConf) 
extends Sink(queryName, con
   }
 
   private def writeBatchWithMutate(df:DataFrame):Unit = {
+    import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
+
+    import scala.collection.JavaConversions._
+
     // TODO: FIX THIS. overwrite local cache config.
     val mergedOptions = conf.options ++ TaskConf.parseLocalCacheConfigs(conf)
     val graphConfig: Config = 
ConfigFactory.parseMap(mergedOptions).withFallback(ConfigFactory.load())

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d67adc5e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Udf.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Udf.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Udf.scala
new file mode 100644
index 0000000..821527c
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Udf.scala
@@ -0,0 +1,14 @@
+package org.apache.s2graph.s2jobs.udfs
+
+import org.apache.s2graph.s2jobs.Logger
+import org.apache.spark.sql.SparkSession
+
+case class UdfOption(name:String, `class`:String, params:Option[Map[String, 
String]] = None)
+trait Udf extends Serializable with Logger {
+  def register(ss: SparkSession, name:String, options:Map[String, String])
+}
+
+
+
+
+

Reply via email to