add continuous trigger option
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/29a4ed9f Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/29a4ed9f Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/29a4ed9f Branch: refs/heads/master Commit: 29a4ed9fe4e0768a86511086fc3c949fcab47e1c Parents: 488a56d Author: Chul Kang <el...@apache.org> Authored: Fri Jun 8 00:14:59 2018 +0900 Committer: Chul Kang <el...@apache.org> Committed: Fri Jun 8 00:14:59 2018 +0900 ---------------------------------------------------------------------- s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/29a4ed9f/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala index f7c72cc..720c2d7 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala @@ -69,6 +69,8 @@ abstract class Sink(queryName: String, override val conf: TaskConf) extends Task } val interval = conf.options.getOrElse("interval", DEFAULT_TRIGGER_INTERVAL) val checkpointLocation = conf.options.getOrElse("checkpointLocation", DEFAULT_CHECKPOINT_LOCATION) + val isContinuous = conf.options.getOrElse("isContinuous", "false").toBoolean + val trigger = if (isContinuous) Trigger.Continuous(interval) else Trigger.ProcessingTime(interval) val cfg = conf.options ++ Map("checkpointLocation" -> checkpointLocation) @@ -78,7 +80,7 @@ abstract class Sink(queryName: String, override val conf: TaskConf) extends Task .queryName(s"${queryName}_${conf.name}") .format(FORMAT) .options(cfg) - .trigger(Trigger.ProcessingTime(interval)) + .trigger(trigger) .outputMode(mode) .start() }