add continuous trigger option

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/29a4ed9f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/29a4ed9f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/29a4ed9f

Branch: refs/heads/master
Commit: 29a4ed9fe4e0768a86511086fc3c949fcab47e1c
Parents: 488a56d
Author: Chul Kang <el...@apache.org>
Authored: Fri Jun 8 00:14:59 2018 +0900
Committer: Chul Kang <el...@apache.org>
Committed: Fri Jun 8 00:14:59 2018 +0900

----------------------------------------------------------------------
 s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/29a4ed9f/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index f7c72cc..720c2d7 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -69,6 +69,8 @@ abstract class Sink(queryName: String, override val conf: 
TaskConf) extends Task
     }
     val interval = conf.options.getOrElse("interval", DEFAULT_TRIGGER_INTERVAL)
     val checkpointLocation = conf.options.getOrElse("checkpointLocation", 
DEFAULT_CHECKPOINT_LOCATION)
+    val isContinuous = conf.options.getOrElse("isContinuous", 
"false").toBoolean
+    val trigger = if (isContinuous) Trigger.Continuous(interval) else 
Trigger.ProcessingTime(interval)
 
     val cfg = conf.options ++ Map("checkpointLocation" -> checkpointLocation)
 
@@ -78,7 +80,7 @@ abstract class Sink(queryName: String, override val conf: 
TaskConf) extends Task
       .queryName(s"${queryName}_${conf.name}")
       .format(FORMAT)
       .options(cfg)
-      .trigger(Trigger.ProcessingTime(interval))
+      .trigger(trigger)
       .outputMode(mode)
       .start()
   }

Reply via email to